Merge pull request #35809 from cpuguy83/fix_container_zombies

Fix race conditions in libcontainerd process handling
This commit is contained in:
Sebastiaan van Stijn 2017-12-16 01:16:50 -08:00 committed by GitHub
commit 52656da950
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 107 additions and 71 deletions

View File

@ -4,10 +4,10 @@ import (
"context" "context"
"fmt" "fmt"
"runtime" "runtime"
"strings"
"syscall" "syscall"
"time" "time"
"github.com/docker/docker/api/errdefs"
containerpkg "github.com/docker/docker/container" containerpkg "github.com/docker/docker/container"
"github.com/docker/docker/libcontainerd" "github.com/docker/docker/libcontainerd"
"github.com/docker/docker/pkg/signal" "github.com/docker/docker/pkg/signal"
@ -97,15 +97,11 @@ func (daemon *Daemon) killWithSignal(container *containerpkg.Container, sig int)
} }
if err := daemon.kill(container, sig); err != nil { if err := daemon.kill(container, sig); err != nil {
err = errors.Wrapf(err, "Cannot kill container %s", container.ID) if errdefs.IsNotFound(err) {
// if container or process not exists, ignore the error
// TODO: we shouldn't have to parse error strings from containerd
if strings.Contains(err.Error(), "container not found") ||
strings.Contains(err.Error(), "no such process") {
logrus.Warnf("container kill failed because of 'container not found' or 'no such process': %s", err.Error())
unpause = false unpause = false
logrus.WithError(err).WithField("container", container.ID).WithField("action", "kill").Debug("container kill failed because of 'container not found' or 'no such process'")
} else { } else {
return err return errors.Wrapf(err, "Cannot kill container %s", container.ID)
} }
} }
@ -171,7 +167,7 @@ func (daemon *Daemon) Kill(container *containerpkg.Container) error {
// killPossibleDeadProcess is a wrapper around killSig() suppressing "no such process" error. // killPossibleDeadProcess is a wrapper around killSig() suppressing "no such process" error.
func (daemon *Daemon) killPossiblyDeadProcess(container *containerpkg.Container, sig int) error { func (daemon *Daemon) killPossiblyDeadProcess(container *containerpkg.Container, sig int) error {
err := daemon.killWithSignal(container, sig) err := daemon.killWithSignal(container, sig)
if err == syscall.ESRCH { if errdefs.IsNotFound(err) {
e := errNoSuchProcess{container.GetPID(), sig} e := errNoSuchProcess{container.GetPID(), sig}
logrus.Debug(e) logrus.Debug(e)
return e return e

View File

@ -27,6 +27,7 @@ import (
"github.com/containerd/containerd/archive" "github.com/containerd/containerd/archive"
"github.com/containerd/containerd/cio" "github.com/containerd/containerd/cio"
"github.com/containerd/containerd/content" "github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images" "github.com/containerd/containerd/images"
"github.com/containerd/containerd/linux/runctypes" "github.com/containerd/containerd/linux/runctypes"
"github.com/containerd/typeurl" "github.com/containerd/typeurl"
@ -42,7 +43,7 @@ import (
const InitProcessName = "init" const InitProcessName = "init"
type container struct { type container struct {
sync.Mutex mu sync.Mutex
bundleDir string bundleDir string
ctr containerd.Container ctr containerd.Container
@ -51,6 +52,54 @@ type container struct {
oomKilled bool oomKilled bool
} }
func (c *container) setTask(t containerd.Task) {
c.mu.Lock()
c.task = t
c.mu.Unlock()
}
func (c *container) getTask() containerd.Task {
c.mu.Lock()
t := c.task
c.mu.Unlock()
return t
}
func (c *container) addProcess(id string, p containerd.Process) {
c.mu.Lock()
if c.execs == nil {
c.execs = make(map[string]containerd.Process)
}
c.execs[id] = p
c.mu.Unlock()
}
func (c *container) deleteProcess(id string) {
c.mu.Lock()
delete(c.execs, id)
c.mu.Unlock()
}
func (c *container) getProcess(id string) containerd.Process {
c.mu.Lock()
p := c.execs[id]
c.mu.Unlock()
return p
}
func (c *container) setOOMKilled(killed bool) {
c.mu.Lock()
c.oomKilled = killed
c.mu.Unlock()
}
func (c *container) getOOMKilled() bool {
c.mu.Lock()
killed := c.oomKilled
c.mu.Unlock()
return killed
}
type client struct { type client struct {
sync.RWMutex // protects containers map sync.RWMutex // protects containers map
@ -160,10 +209,10 @@ func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, run
// Start create and start a task for the specified containerd id // Start create and start a task for the specified containerd id
func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio StdioCallback) (int, error) { func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio StdioCallback) (int, error) {
ctr := c.getContainer(id) ctr := c.getContainer(id)
switch { if ctr == nil {
case ctr == nil:
return -1, errors.WithStack(newNotFoundError("no such container")) return -1, errors.WithStack(newNotFoundError("no such container"))
case ctr.task != nil: }
if t := ctr.getTask(); t != nil {
return -1, errors.WithStack(newConflictError("container already started")) return -1, errors.WithStack(newConflictError("container already started"))
} }
@ -227,9 +276,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
return -1, err return -1, err
} }
c.Lock() ctr.setTask(t)
c.containers[id].task = t
c.Unlock()
// Signal c.createIO that it can call CloseIO // Signal c.createIO that it can call CloseIO
close(stdinCloseSync) close(stdinCloseSync)
@ -239,9 +286,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
c.logger.WithError(err).WithField("container", id). c.logger.WithError(err).WithField("container", id).
Error("failed to delete task after fail start") Error("failed to delete task after fail start")
} }
c.Lock() ctr.setTask(nil)
c.containers[id].task = nil
c.Unlock()
return -1, err return -1, err
} }
@ -250,12 +295,15 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (int, error) { func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (int, error) {
ctr := c.getContainer(containerID) ctr := c.getContainer(containerID)
switch { if ctr == nil {
case ctr == nil:
return -1, errors.WithStack(newNotFoundError("no such container")) return -1, errors.WithStack(newNotFoundError("no such container"))
case ctr.task == nil: }
t := ctr.getTask()
if t == nil {
return -1, errors.WithStack(newInvalidParameterError("container is not running")) return -1, errors.WithStack(newInvalidParameterError("container is not running"))
case ctr.execs != nil && ctr.execs[processID] != nil: }
if p := ctr.getProcess(processID); p != nil {
return -1, errors.WithStack(newConflictError("id already in use")) return -1, errors.WithStack(newConflictError("id already in use"))
} }
@ -278,7 +326,7 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
} }
}() }()
p, err = ctr.task.Exec(ctx, processID, spec, func(id string) (cio.IO, error) { p, err = t.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio) rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio)
return rio, err return rio, err
}) })
@ -291,21 +339,14 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
return -1, err return -1, err
} }
ctr.Lock() ctr.addProcess(processID, p)
if ctr.execs == nil {
ctr.execs = make(map[string]containerd.Process)
}
ctr.execs[processID] = p
ctr.Unlock()
// Signal c.createIO that it can call CloseIO // Signal c.createIO that it can call CloseIO
close(stdinCloseSync) close(stdinCloseSync)
if err = p.Start(ctx); err != nil { if err = p.Start(ctx); err != nil {
p.Delete(context.Background()) p.Delete(context.Background())
ctr.Lock() ctr.deleteProcess(processID)
delete(ctr.execs, processID)
ctr.Unlock()
return -1, err return -1, err
} }
@ -317,7 +358,7 @@ func (c *client) SignalProcess(ctx context.Context, containerID, processID strin
if err != nil { if err != nil {
return err return err
} }
return p.Kill(ctx, syscall.Signal(signal)) return wrapError(p.Kill(ctx, syscall.Signal(signal)))
} }
func (c *client) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error { func (c *client) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error {
@ -431,12 +472,9 @@ func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, ti
return 255, time.Now(), nil return 255, time.Now(), nil
} }
c.Lock() if ctr := c.getContainer(containerID); ctr != nil {
if ctr, ok := c.containers[containerID]; ok { ctr.setTask(nil)
ctr.task = nil
} }
c.Unlock()
return status.ExitCode(), status.ExitTime(), nil return status.ExitCode(), status.ExitTime(), nil
} }
@ -470,7 +508,12 @@ func (c *client) Status(ctx context.Context, containerID string) (Status, error)
return StatusUnknown, errors.WithStack(newNotFoundError("no such container")) return StatusUnknown, errors.WithStack(newNotFoundError("no such container"))
} }
s, err := ctr.task.Status(ctx) t := ctr.getTask()
if t == nil {
return StatusUnknown, errors.WithStack(newNotFoundError("no such task"))
}
s, err := t.Status(ctx)
if err != nil { if err != nil {
return StatusUnknown, err return StatusUnknown, err
} }
@ -546,26 +589,22 @@ func (c *client) removeContainer(id string) {
func (c *client) getProcess(containerID, processID string) (containerd.Process, error) { func (c *client) getProcess(containerID, processID string) (containerd.Process, error) {
ctr := c.getContainer(containerID) ctr := c.getContainer(containerID)
switch { if ctr == nil {
case ctr == nil:
return nil, errors.WithStack(newNotFoundError("no such container")) return nil, errors.WithStack(newNotFoundError("no such container"))
case ctr.task == nil:
return nil, errors.WithStack(newNotFoundError("container is not running"))
case processID == InitProcessName:
return ctr.task, nil
default:
ctr.Lock()
defer ctr.Unlock()
if ctr.execs == nil {
return nil, errors.WithStack(newNotFoundError("no execs"))
}
} }
p := ctr.execs[processID] t := ctr.getTask()
if t == nil {
return nil, errors.WithStack(newNotFoundError("container is not running"))
}
if processID == InitProcessName {
return t, nil
}
p := ctr.getProcess(processID)
if p == nil { if p == nil {
return nil, errors.WithStack(newNotFoundError("no such exec")) return nil, errors.WithStack(newNotFoundError("no such exec"))
} }
return p, nil return p, nil
} }
@ -623,12 +662,7 @@ func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) {
} }
if et == EventExit && ei.ProcessID != ei.ContainerID { if et == EventExit && ei.ProcessID != ei.ContainerID {
var p containerd.Process p := ctr.getProcess(ei.ProcessID)
ctr.Lock()
if ctr.execs != nil {
p = ctr.execs[ei.ProcessID]
}
ctr.Unlock()
if p == nil { if p == nil {
c.logger.WithError(errors.New("no such process")). c.logger.WithError(errors.New("no such process")).
WithFields(logrus.Fields{ WithFields(logrus.Fields{
@ -644,9 +678,8 @@ func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) {
"process": ei.ProcessID, "process": ei.ProcessID,
}).Warn("failed to delete process") }).Warn("failed to delete process")
} }
c.Lock() ctr.deleteProcess(ei.ProcessID)
delete(ctr.execs, ei.ProcessID)
c.Unlock()
ctr := c.getContainer(ei.ContainerID) ctr := c.getContainer(ei.ContainerID)
if ctr == nil { if ctr == nil {
c.logger.WithFields(logrus.Fields{ c.logger.WithFields(logrus.Fields{
@ -783,10 +816,10 @@ func (c *client) processEventStream(ctx context.Context) {
} }
if oomKilled { if oomKilled {
ctr.oomKilled = true ctr.setOOMKilled(true)
oomKilled = false oomKilled = false
} }
ei.OOMKilled = ctr.oomKilled ei.OOMKilled = ctr.getOOMKilled()
c.processEvent(ctr, et, ei) c.processEvent(ctr, et, ei)
} }
@ -816,12 +849,19 @@ func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.R
} }
func wrapError(err error) error { func wrapError(err error) error {
if err != nil { if err == nil {
msg := err.Error() return nil
for _, s := range []string{"container does not exist", "not found", "no such container"} { }
if strings.Contains(msg, s) {
return wrapNotFoundError(err) switch {
} case errdefs.IsNotFound(err):
return wrapNotFoundError(err)
}
msg := err.Error()
for _, s := range []string{"container does not exist", "not found", "no such container"} {
if strings.Contains(msg, s) {
return wrapNotFoundError(err)
} }
} }
return err return err