mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
52ed3e0896
Updates swarmkit, grpc, and all related vendors Signed-off-by: Derek McGowan <derek@mcgstyle.net>
344 lines
7.7 KiB
Go
344 lines
7.7 KiB
Go
// +build !windows
|
|
|
|
package libcontainerd // import "github.com/docker/docker/libcontainerd"
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/BurntSushi/toml"
|
|
"github.com/containerd/containerd"
|
|
"github.com/containerd/containerd/services/server"
|
|
"github.com/docker/docker/pkg/system"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const (
|
|
maxConnectionRetryCount = 3
|
|
healthCheckTimeout = 3 * time.Second
|
|
shutdownTimeout = 15 * time.Second
|
|
configFile = "containerd.toml"
|
|
binaryName = "docker-containerd"
|
|
pidFile = "docker-containerd.pid"
|
|
)
|
|
|
|
type pluginConfigs struct {
|
|
Plugins map[string]interface{} `toml:"plugins"`
|
|
}
|
|
|
|
type remote struct {
|
|
sync.RWMutex
|
|
server.Config
|
|
|
|
daemonPid int
|
|
logger *logrus.Entry
|
|
|
|
daemonWaitCh chan struct{}
|
|
clients []*client
|
|
shutdownContext context.Context
|
|
shutdownCancel context.CancelFunc
|
|
shutdown bool
|
|
|
|
// Options
|
|
startDaemon bool
|
|
rootDir string
|
|
stateDir string
|
|
snapshotter string
|
|
pluginConfs pluginConfigs
|
|
}
|
|
|
|
// New creates a fresh instance of libcontainerd remote.
|
|
func New(rootDir, stateDir string, options ...RemoteOption) (rem Remote, err error) {
|
|
defer func() {
|
|
if err != nil {
|
|
err = errors.Wrap(err, "Failed to connect to containerd")
|
|
}
|
|
}()
|
|
|
|
r := &remote{
|
|
rootDir: rootDir,
|
|
stateDir: stateDir,
|
|
Config: server.Config{
|
|
Root: filepath.Join(rootDir, "daemon"),
|
|
State: filepath.Join(stateDir, "daemon"),
|
|
},
|
|
pluginConfs: pluginConfigs{make(map[string]interface{})},
|
|
daemonPid: -1,
|
|
logger: logrus.WithField("module", "libcontainerd"),
|
|
}
|
|
r.shutdownContext, r.shutdownCancel = context.WithCancel(context.Background())
|
|
|
|
rem = r
|
|
for _, option := range options {
|
|
if err = option.Apply(r); err != nil {
|
|
return
|
|
}
|
|
}
|
|
r.setDefaults()
|
|
|
|
if err = system.MkdirAll(stateDir, 0700, ""); err != nil {
|
|
return
|
|
}
|
|
|
|
if r.startDaemon {
|
|
os.Remove(r.GRPC.Address)
|
|
if err = r.startContainerd(); err != nil {
|
|
return
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
r.Cleanup()
|
|
}
|
|
}()
|
|
}
|
|
|
|
// This connection is just used to monitor the connection
|
|
client, err := containerd.New(r.GRPC.Address)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if _, err := client.Version(context.Background()); err != nil {
|
|
system.KillProcess(r.daemonPid)
|
|
return nil, errors.Wrapf(err, "unable to get containerd version")
|
|
}
|
|
|
|
go r.monitorConnection(client)
|
|
|
|
return r, nil
|
|
}
|
|
|
|
func (r *remote) NewClient(ns string, b Backend) (Client, error) {
|
|
c := &client{
|
|
stateDir: r.stateDir,
|
|
logger: r.logger.WithField("namespace", ns),
|
|
namespace: ns,
|
|
backend: b,
|
|
containers: make(map[string]*container),
|
|
}
|
|
|
|
rclient, err := containerd.New(r.GRPC.Address, containerd.WithDefaultNamespace(ns))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.remote = rclient
|
|
|
|
go c.processEventStream(r.shutdownContext)
|
|
|
|
r.Lock()
|
|
r.clients = append(r.clients, c)
|
|
r.Unlock()
|
|
return c, nil
|
|
}
|
|
|
|
func (r *remote) Cleanup() {
|
|
if r.daemonPid != -1 {
|
|
r.shutdownCancel()
|
|
r.stopDaemon()
|
|
}
|
|
|
|
// cleanup some files
|
|
os.Remove(filepath.Join(r.stateDir, pidFile))
|
|
|
|
r.platformCleanup()
|
|
}
|
|
|
|
func (r *remote) getContainerdPid() (int, error) {
|
|
pidFile := filepath.Join(r.stateDir, pidFile)
|
|
f, err := os.OpenFile(pidFile, os.O_RDWR, 0600)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return -1, nil
|
|
}
|
|
return -1, err
|
|
}
|
|
defer f.Close()
|
|
|
|
b := make([]byte, 8)
|
|
n, err := f.Read(b)
|
|
if err != nil && err != io.EOF {
|
|
return -1, err
|
|
}
|
|
|
|
if n > 0 {
|
|
pid, err := strconv.ParseUint(string(b[:n]), 10, 64)
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
if system.IsProcessAlive(int(pid)) {
|
|
return int(pid), nil
|
|
}
|
|
}
|
|
|
|
return -1, nil
|
|
}
|
|
|
|
func (r *remote) getContainerdConfig() (string, error) {
|
|
path := filepath.Join(r.stateDir, configFile)
|
|
f, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
|
|
if err != nil {
|
|
return "", errors.Wrapf(err, "failed to open containerd config file at %s", path)
|
|
}
|
|
defer f.Close()
|
|
|
|
enc := toml.NewEncoder(f)
|
|
if err = enc.Encode(r.Config); err != nil {
|
|
return "", errors.Wrapf(err, "failed to encode general config")
|
|
}
|
|
if err = enc.Encode(r.pluginConfs); err != nil {
|
|
return "", errors.Wrapf(err, "failed to encode plugin configs")
|
|
}
|
|
|
|
return path, nil
|
|
}
|
|
|
|
func (r *remote) startContainerd() error {
|
|
pid, err := r.getContainerdPid()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if pid != -1 {
|
|
r.daemonPid = pid
|
|
logrus.WithField("pid", pid).
|
|
Infof("libcontainerd: %s is still running", binaryName)
|
|
return nil
|
|
}
|
|
|
|
configFile, err := r.getContainerdConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
args := []string{"--config", configFile}
|
|
cmd := exec.Command(binaryName, args...)
|
|
// redirect containerd logs to docker logs
|
|
cmd.Stdout = os.Stdout
|
|
cmd.Stderr = os.Stderr
|
|
cmd.SysProcAttr = containerdSysProcAttr()
|
|
// clear the NOTIFY_SOCKET from the env when starting containerd
|
|
cmd.Env = nil
|
|
for _, e := range os.Environ() {
|
|
if !strings.HasPrefix(e, "NOTIFY_SOCKET") {
|
|
cmd.Env = append(cmd.Env, e)
|
|
}
|
|
}
|
|
if err := cmd.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
r.daemonWaitCh = make(chan struct{})
|
|
go func() {
|
|
// Reap our child when needed
|
|
if err := cmd.Wait(); err != nil {
|
|
r.logger.WithError(err).Errorf("containerd did not exit successfully")
|
|
}
|
|
close(r.daemonWaitCh)
|
|
}()
|
|
|
|
r.daemonPid = cmd.Process.Pid
|
|
|
|
err = ioutil.WriteFile(filepath.Join(r.stateDir, pidFile), []byte(fmt.Sprintf("%d", r.daemonPid)), 0660)
|
|
if err != nil {
|
|
system.KillProcess(r.daemonPid)
|
|
return errors.Wrap(err, "libcontainerd: failed to save daemon pid to disk")
|
|
}
|
|
|
|
logrus.WithField("pid", r.daemonPid).
|
|
Infof("libcontainerd: started new %s process", binaryName)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *remote) monitorConnection(monitor *containerd.Client) {
|
|
var transientFailureCount = 0
|
|
|
|
for {
|
|
select {
|
|
case <-r.shutdownContext.Done():
|
|
r.logger.Info("stopping healthcheck following graceful shutdown")
|
|
monitor.Close()
|
|
return
|
|
case <-time.After(500 * time.Millisecond):
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(r.shutdownContext, healthCheckTimeout)
|
|
_, err := monitor.IsServing(ctx)
|
|
cancel()
|
|
if err == nil {
|
|
transientFailureCount = 0
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case <-r.shutdownContext.Done():
|
|
r.logger.Info("stopping healthcheck following graceful shutdown")
|
|
monitor.Close()
|
|
return
|
|
default:
|
|
}
|
|
|
|
r.logger.WithError(err).WithField("binary", binaryName).Debug("daemon is not responding")
|
|
|
|
if r.daemonPid == -1 {
|
|
continue
|
|
}
|
|
|
|
transientFailureCount++
|
|
if transientFailureCount < maxConnectionRetryCount || system.IsProcessAlive(r.daemonPid) {
|
|
continue
|
|
}
|
|
|
|
transientFailureCount = 0
|
|
if system.IsProcessAlive(r.daemonPid) {
|
|
r.logger.WithField("pid", r.daemonPid).Info("killing and restarting containerd")
|
|
// Try to get a stack trace
|
|
syscall.Kill(r.daemonPid, syscall.SIGUSR1)
|
|
<-time.After(100 * time.Millisecond)
|
|
system.KillProcess(r.daemonPid)
|
|
}
|
|
if r.daemonWaitCh != nil {
|
|
<-r.daemonWaitCh
|
|
}
|
|
|
|
os.Remove(r.GRPC.Address)
|
|
if err := r.startContainerd(); err != nil {
|
|
r.logger.WithError(err).Error("failed restarting containerd")
|
|
continue
|
|
}
|
|
|
|
if err := monitor.Reconnect(); err != nil {
|
|
r.logger.WithError(err).Error("failed connect to containerd")
|
|
continue
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
for _, c := range r.clients {
|
|
wg.Add(1)
|
|
|
|
go func(c *client) {
|
|
defer wg.Done()
|
|
c.logger.WithField("namespace", c.namespace).Debug("creating new containerd remote client")
|
|
if err := c.reconnect(); err != nil {
|
|
r.logger.WithError(err).Error("failed to connect to containerd")
|
|
// TODO: Better way to handle this?
|
|
// This *shouldn't* happen, but this could wind up where the daemon
|
|
// is not able to communicate with an eventually up containerd
|
|
}
|
|
}(c)
|
|
|
|
wg.Wait()
|
|
}
|
|
}
|
|
}
|