From dd2e19ebd58cd8896d79b4b8db61144b93717b33 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Wed, 23 May 2018 12:15:21 -0700 Subject: [PATCH] libcontainerd: split client and supervisor Adds a supervisor package for starting and monitoring containerd. Separates grpc connection allowing access from daemon. Signed-off-by: Derek McGowan --- cmd/dockerd/daemon.go | 68 +++-- cmd/dockerd/daemon_unix.go | 23 +- cmd/dockerd/daemon_windows.go | 4 +- daemon/daemon.go | 48 +++- libcontainerd/client_daemon.go | 56 ++--- libcontainerd/client_local_windows.go | 22 ++ libcontainerd/remote_daemon_options.go | 142 ----------- libcontainerd/remote_daemon_options_linux.go | 18 -- libcontainerd/remote_local.go | 59 ----- .../{ => supervisor}/remote_daemon.go | 238 ++++++++---------- .../{ => supervisor}/remote_daemon_linux.go | 13 +- .../supervisor/remote_daemon_options.go | 55 ++++ .../supervisor/remote_daemon_options_linux.go | 9 + .../{ => supervisor}/remote_daemon_windows.go | 13 +- libcontainerd/{ => supervisor}/utils_linux.go | 2 +- libcontainerd/supervisor/utils_windows.go | 9 + libcontainerd/types.go | 17 -- libcontainerd/utils_windows.go | 8 - plugin/executor/containerd/containerd.go | 10 +- 19 files changed, 339 insertions(+), 475 deletions(-) delete mode 100644 libcontainerd/remote_daemon_options.go delete mode 100644 libcontainerd/remote_daemon_options_linux.go delete mode 100644 libcontainerd/remote_local.go rename libcontainerd/{ => supervisor}/remote_daemon.go (55%) rename libcontainerd/{ => supervisor}/remote_daemon_linux.go (84%) create mode 100644 libcontainerd/supervisor/remote_daemon_options.go create mode 100644 libcontainerd/supervisor/remote_daemon_options_linux.go rename libcontainerd/{ => supervisor}/remote_daemon_windows.go (79%) rename libcontainerd/{ => supervisor}/utils_linux.go (74%) create mode 100644 libcontainerd/supervisor/utils_windows.go diff --git a/cmd/dockerd/daemon.go b/cmd/dockerd/daemon.go index efefaa1ac3..0096a557d5 100644 --- a/cmd/dockerd/daemon.go +++ b/cmd/dockerd/daemon.go @@ -36,7 +36,7 @@ import ( "github.com/docker/docker/daemon/config" "github.com/docker/docker/daemon/listeners" "github.com/docker/docker/dockerversion" - "github.com/docker/docker/libcontainerd" + "github.com/docker/docker/libcontainerd/supervisor" dopts "github.com/docker/docker/opts" "github.com/docker/docker/pkg/authorization" "github.com/docker/docker/pkg/jsonmessage" @@ -45,7 +45,6 @@ import ( "github.com/docker/docker/pkg/signal" "github.com/docker/docker/pkg/system" "github.com/docker/docker/plugin" - "github.com/docker/docker/registry" "github.com/docker/docker/runconfig" "github.com/docker/go-connections/tlsconfig" swarmapi "github.com/docker/swarmkit/api" @@ -112,6 +111,10 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) { return err } + if err := system.MkdirAll(cli.Config.ExecRoot, 0700, ""); err != nil { + return err + } + if cli.Pidfile != "" { pf, err := pidfile.New(cli.Pidfile) if err != nil { @@ -135,19 +138,27 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) { return fmt.Errorf("Failed to load listeners: %v", err) } - registryService, err := registry.NewService(cli.Config.ServiceOptions) - if err != nil { - return err - } + ctx, cancel := context.WithCancel(context.Background()) + if cli.Config.ContainerdAddr == "" && runtime.GOOS != "windows" { + opts, err := cli.getContainerdDaemonOpts() + if err != nil { + cancel() + return fmt.Errorf("Failed to generate containerd options: %v", err) + } - rOpts, err := cli.getRemoteOptions() - if err != nil { - return fmt.Errorf("Failed to generate containerd options: %v", err) - } - containerdRemote, err := libcontainerd.New(filepath.Join(cli.Config.Root, "containerd"), filepath.Join(cli.Config.ExecRoot, "containerd"), rOpts...) - if err != nil { - return err + r, err := supervisor.Start(ctx, filepath.Join(cli.Config.Root, "containerd"), filepath.Join(cli.Config.ExecRoot, "containerd"), opts...) + if err != nil { + cancel() + return fmt.Errorf("Failed to start containerd: %v", err) + } + + cli.Config.ContainerdAddr = r.Address() + + // Try to wait for containerd to shutdown + defer r.WaitTimeout(10 * time.Second) } + defer cancel() + signal.Trap(func() { cli.stop() <-stopc // wait for daemonCli.start() to return @@ -162,7 +173,7 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) { logrus.Fatalf("Error creating middlewares: %v", err) } - d, err := daemon.NewDaemon(cli.Config, registryService, containerdRemote, pluginStore) + d, err := daemon.NewDaemon(ctx, cli.Config, pluginStore) if err != nil { return fmt.Errorf("Error starting daemon: %v", err) } @@ -207,10 +218,7 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) { initRouter(routerOptions) - // process cluster change notifications - watchCtx, cancel := context.WithCancel(context.Background()) - defer cancel() - go d.ProcessClusterNotifications(watchCtx, c.GetWatchStream()) + go d.ProcessClusterNotifications(ctx, c.GetWatchStream()) cli.setupConfigReloadTrap() @@ -227,8 +235,12 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) { // Wait for serve API to complete errAPI := <-serveAPIWait c.Cleanup() + shutdownDaemon(d) - containerdRemote.Cleanup() + + // Stop notification processing and any background processes + cancel() + if errAPI != nil { return fmt.Errorf("Shutting down due to ServeAPI error: %v", errAPI) } @@ -511,14 +523,22 @@ func (cli *DaemonCli) initMiddlewares(s *apiserver.Server, cfg *apiserver.Config return nil } -func (cli *DaemonCli) getRemoteOptions() ([]libcontainerd.RemoteOption, error) { - opts := []libcontainerd.RemoteOption{} - - pOpts, err := cli.getPlatformRemoteOptions() +func (cli *DaemonCli) getContainerdDaemonOpts() ([]supervisor.DaemonOpt, error) { + opts, err := cli.getPlatformContainerdDaemonOpts() if err != nil { return nil, err } - opts = append(opts, pOpts...) + + if cli.Config.Debug { + opts = append(opts, supervisor.WithLogLevel("debug")) + } else if cli.Config.LogLevel != "" { + opts = append(opts, supervisor.WithLogLevel(cli.Config.LogLevel)) + } + + if !cli.Config.CriContainerd { + opts = append(opts, supervisor.WithPlugin("cri", nil)) + } + return opts, nil } diff --git a/cmd/dockerd/daemon_unix.go b/cmd/dockerd/daemon_unix.go index 51e56673a0..5798c50521 100644 --- a/cmd/dockerd/daemon_unix.go +++ b/cmd/dockerd/daemon_unix.go @@ -13,7 +13,7 @@ import ( "github.com/containerd/containerd/runtime/linux" "github.com/docker/docker/cmd/dockerd/hack" "github.com/docker/docker/daemon" - "github.com/docker/docker/libcontainerd" + "github.com/docker/docker/libcontainerd/supervisor" "github.com/docker/libnetwork/portallocator" "golang.org/x/sys/unix" ) @@ -36,29 +36,16 @@ func getDaemonConfDir(_ string) string { return "/etc/docker" } -func (cli *DaemonCli) getPlatformRemoteOptions() ([]libcontainerd.RemoteOption, error) { - opts := []libcontainerd.RemoteOption{ - libcontainerd.WithOOMScore(cli.Config.OOMScoreAdjust), - libcontainerd.WithPlugin("linux", &linux.Config{ +func (cli *DaemonCli) getPlatformContainerdDaemonOpts() ([]supervisor.DaemonOpt, error) { + opts := []supervisor.DaemonOpt{ + supervisor.WithOOMScore(cli.Config.OOMScoreAdjust), + supervisor.WithPlugin("linux", &linux.Config{ Shim: daemon.DefaultShimBinary, Runtime: daemon.DefaultRuntimeBinary, RuntimeRoot: filepath.Join(cli.Config.Root, "runc"), ShimDebug: cli.Config.Debug, }), } - if cli.Config.Debug { - opts = append(opts, libcontainerd.WithLogLevel("debug")) - } else if cli.Config.LogLevel != "" { - opts = append(opts, libcontainerd.WithLogLevel(cli.Config.LogLevel)) - } - if cli.Config.ContainerdAddr != "" { - opts = append(opts, libcontainerd.WithRemoteAddr(cli.Config.ContainerdAddr)) - } else { - opts = append(opts, libcontainerd.WithStartDaemon(true)) - } - if !cli.Config.CriContainerd { - opts = append(opts, libcontainerd.WithPlugin("cri", nil)) - } return opts, nil } diff --git a/cmd/dockerd/daemon_windows.go b/cmd/dockerd/daemon_windows.go index 224c509455..11ff7da871 100644 --- a/cmd/dockerd/daemon_windows.go +++ b/cmd/dockerd/daemon_windows.go @@ -6,7 +6,7 @@ import ( "os" "path/filepath" - "github.com/docker/docker/libcontainerd" + "github.com/docker/docker/libcontainerd/supervisor" "github.com/sirupsen/logrus" "golang.org/x/sys/windows" ) @@ -48,7 +48,7 @@ func notifyShutdown(err error) { } } -func (cli *DaemonCli) getPlatformRemoteOptions() ([]libcontainerd.RemoteOption, error) { +func (cli *DaemonCli) getPlatformContainerdDaemonOpts() ([]supervisor.DaemonOpt, error) { return nil, nil } diff --git a/daemon/daemon.go b/daemon/daemon.go index c579766e95..e3ffe66699 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -18,6 +18,11 @@ import ( "sync" "time" + "google.golang.org/grpc" + + "github.com/containerd/containerd" + "github.com/containerd/containerd/defaults" + "github.com/containerd/containerd/pkg/dialer" "github.com/docker/docker/api/types" containertypes "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/swarm" @@ -94,6 +99,7 @@ type Daemon struct { PluginStore *plugin.Store // todo: remove pluginManager *plugin.Manager linkIndex *linkIndex + containerdCli *containerd.Client containerd libcontainerd.Client defaultIsolation containertypes.Isolation // Default isolation mode on Windows clusterProvider cluster.Provider @@ -565,9 +571,14 @@ func (daemon *Daemon) IsSwarmCompatible() error { // NewDaemon sets up everything for the daemon to be able to service // requests from the webserver. -func NewDaemon(config *config.Config, registryService registry.Service, containerdRemote libcontainerd.Remote, pluginStore *plugin.Store) (daemon *Daemon, err error) { +func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.Store) (daemon *Daemon, err error) { setDefaultMtu(config) + registryService, err := registry.NewService(config.ServiceOptions) + if err != nil { + return nil, err + } + // Ensure that we have a correct root key limit for launching containers. if err := ModifyRootKeyLimit(); err != nil { logrus.Warnf("unable to modify root key limit, number of containers could be limited by this quota: %v", err) @@ -720,8 +731,35 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe } registerMetricsPluginCallback(d.PluginStore, metricsSockPath) + gopts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithBackoffMaxDelay(3 * time.Second), + grpc.WithDialer(dialer.Dialer), + + // TODO(stevvooe): We may need to allow configuration of this on the client. + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)), + } + if config.ContainerdAddr != "" { + d.containerdCli, err = containerd.New(config.ContainerdAddr, containerd.WithDefaultNamespace(ContainersNamespace), containerd.WithDialOpts(gopts)) + if err != nil { + return nil, errors.Wrapf(err, "failed to dial %q", config.ContainerdAddr) + } + } + createPluginExec := func(m *plugin.Manager) (plugin.Executor, error) { - return pluginexec.New(getPluginExecRoot(config.Root), containerdRemote, m) + var pluginCli *containerd.Client + + // Windows is not currently using containerd, keep the + // client as nil + if config.ContainerdAddr != "" { + pluginCli, err = containerd.New(config.ContainerdAddr, containerd.WithDefaultNamespace(pluginexec.PluginNamespace), containerd.WithDialOpts(gopts)) + if err != nil { + return nil, errors.Wrapf(err, "failed to dial %q", config.ContainerdAddr) + } + } + + return pluginexec.New(ctx, getPluginExecRoot(config.Root), pluginCli, m) } // Plugin system initialization should happen before restore. Do not change order. @@ -880,7 +918,7 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe go d.execCommandGC() - d.containerd, err = containerdRemote.NewClient(ContainersNamespace, d) + d.containerd, err = libcontainerd.NewClient(ctx, d.containerdCli, filepath.Join(config.ExecRoot, "containerd"), ContainersNamespace, d) if err != nil { return nil, err } @@ -1037,6 +1075,10 @@ func (daemon *Daemon) Shutdown() error { daemon.netController.Stop() } + if daemon.containerdCli != nil { + daemon.containerdCli.Close() + } + return daemon.cleanupMounts() } diff --git a/libcontainerd/client_daemon.go b/libcontainerd/client_daemon.go index 1bea770c29..2fa7d63b3f 100644 --- a/libcontainerd/client_daemon.go +++ b/libcontainerd/client_daemon.go @@ -102,38 +102,34 @@ func (c *container) getOOMKilled() bool { type client struct { sync.RWMutex // protects containers map - remote *containerd.Client + client *containerd.Client stateDir string logger *logrus.Entry + ns string - namespace string backend Backend eventQ queue containers map[string]*container } -func (c *client) reconnect() error { - c.Lock() - err := c.remote.Reconnect() - c.Unlock() - return err -} +// NewClient creates a new libcontainerd client from a containerd client +func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b Backend) (Client, error) { + c := &client{ + client: cli, + stateDir: stateDir, + logger: logrus.WithField("module", "libcontainerd").WithField("namespace", ns), + ns: ns, + backend: b, + containers: make(map[string]*container), + } -func (c *client) setRemote(remote *containerd.Client) { - c.Lock() - c.remote = remote - c.Unlock() -} + go c.processEventStream(ctx, ns) -func (c *client) getRemote() *containerd.Client { - c.RLock() - remote := c.remote - c.RUnlock() - return remote + return c, nil } func (c *client) Version(ctx context.Context) (containerd.Version, error) { - return c.getRemote().Version(ctx) + return c.client.Version(ctx) } // Restore loads the containerd container. @@ -170,7 +166,7 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallba err = wrapError(err) }() - ctr, err := c.getRemote().LoadContainer(ctx, id) + ctr, err := c.client.LoadContainer(ctx, id) if err != nil { return false, -1, errors.WithStack(wrapError(err)) } @@ -225,7 +221,7 @@ func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, run c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created") - cdCtr, err := c.getRemote().NewContainer(ctx, id, + cdCtr, err := c.client.NewContainer(ctx, id, containerd.WithSpec(ociSpec), // TODO(mlaventure): when containerd support lcow, revisit runtime value containerd.WithRuntime(fmt.Sprintf("io.containerd.runtime.v1.%s", runtime.GOOS), runtimeOptions)) @@ -268,7 +264,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin // remove the checkpoint when we're done defer func() { if cp != nil { - err := c.getRemote().ContentStore().Delete(context.Background(), cp.Digest) + err := c.client.ContentStore().Delete(context.Background(), cp.Digest) if err != nil { c.logger.WithError(err).WithFields(logrus.Fields{ "ref": checkpointDir, @@ -571,14 +567,14 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi } // Whatever happens, delete the checkpoint from containerd defer func() { - err := c.getRemote().ImageService().Delete(context.Background(), img.Name()) + err := c.client.ImageService().Delete(context.Background(), img.Name()) if err != nil { c.logger.WithError(err).WithField("digest", img.Target().Digest). Warnf("failed to delete checkpoint image") } }() - b, err := content.ReadBlob(ctx, c.getRemote().ContentStore(), img.Target()) + b, err := content.ReadBlob(ctx, c.client.ContentStore(), img.Target()) if err != nil { return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data")) } @@ -598,7 +594,7 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi return errdefs.System(errors.Wrapf(err, "invalid checkpoint")) } - rat, err := c.getRemote().ContentStore().ReaderAt(ctx, *cpDesc) + rat, err := c.client.ContentStore().ReaderAt(ctx, *cpDesc) if err != nil { return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader")) } @@ -735,7 +731,7 @@ func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) { }) } -func (c *client) processEventStream(ctx context.Context) { +func (c *client) processEventStream(ctx context.Context, ns string) { var ( err error ev *events.Envelope @@ -746,9 +742,9 @@ func (c *client) processEventStream(ctx context.Context) { // Filter on both namespace *and* topic. To create an "and" filter, // this must be a single, comma-separated string - eventStream, errC := c.getRemote().EventService().Subscribe(ctx, "namespace=="+c.namespace+",topic~=|^/tasks/|") + eventStream, errC := c.client.EventService().Subscribe(ctx, "namespace=="+ns+",topic~=|^/tasks/|") - c.logger.WithField("namespace", c.namespace).Debug("processing event stream") + c.logger.Debug("processing event stream") var oomKilled bool for { @@ -758,7 +754,7 @@ func (c *client) processEventStream(ctx context.Context) { errStatus, ok := status.FromError(err) if !ok || errStatus.Code() != codes.Canceled { c.logger.WithError(err).Error("failed to get event") - go c.processEventStream(ctx) + go c.processEventStream(ctx, ns) } else { c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown") } @@ -858,7 +854,7 @@ func (c *client) processEventStream(ctx context.Context) { } func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) { - writer, err := c.getRemote().ContentStore().Writer(ctx, content.WithRef(ref)) + writer, err := c.client.ContentStore().Writer(ctx, content.WithRef(ref)) if err != nil { return nil, err } diff --git a/libcontainerd/client_local_windows.go b/libcontainerd/client_local_windows.go index 89d18c6ba7..85f0c18584 100644 --- a/libcontainerd/client_local_windows.go +++ b/libcontainerd/client_local_windows.go @@ -71,6 +71,28 @@ const ( // of docker. const defaultOwner = "docker" +type client struct { + sync.Mutex + + stateDir string + backend Backend + logger *logrus.Entry + eventQ queue + containers map[string]*container +} + +// NewClient creates a new local executor for windows +func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b Backend) (Client, error) { + c := &client{ + stateDir: stateDir, + backend: b, + logger: logrus.WithField("module", "libcontainerd").WithField("module", "libcontainerd").WithField("namespace", ns), + containers: make(map[string]*container), + } + + return c, nil +} + func (c *client) Version(ctx context.Context) (containerd.Version, error) { return containerd.Version{}, errors.New("not implemented on Windows") } diff --git a/libcontainerd/remote_daemon_options.go b/libcontainerd/remote_daemon_options.go deleted file mode 100644 index f035756ed5..0000000000 --- a/libcontainerd/remote_daemon_options.go +++ /dev/null @@ -1,142 +0,0 @@ -// +build !windows - -package libcontainerd // import "github.com/docker/docker/libcontainerd" - -import "fmt" - -// WithRemoteAddr sets the external containerd socket to connect to. -func WithRemoteAddr(addr string) RemoteOption { - return rpcAddr(addr) -} - -type rpcAddr string - -func (a rpcAddr) Apply(r Remote) error { - if remote, ok := r.(*remote); ok { - remote.GRPC.Address = string(a) - return nil - } - return fmt.Errorf("WithRemoteAddr option not supported for this remote") -} - -// WithRemoteAddrUser sets the uid and gid to create the RPC address with -func WithRemoteAddrUser(uid, gid int) RemoteOption { - return rpcUser{uid, gid} -} - -type rpcUser struct { - uid int - gid int -} - -func (u rpcUser) Apply(r Remote) error { - if remote, ok := r.(*remote); ok { - remote.GRPC.UID = u.uid - remote.GRPC.GID = u.gid - return nil - } - return fmt.Errorf("WithRemoteAddr option not supported for this remote") -} - -// WithStartDaemon defines if libcontainerd should also run containerd daemon. -func WithStartDaemon(start bool) RemoteOption { - return startDaemon(start) -} - -type startDaemon bool - -func (s startDaemon) Apply(r Remote) error { - if remote, ok := r.(*remote); ok { - remote.startDaemon = bool(s) - return nil - } - return fmt.Errorf("WithStartDaemon option not supported for this remote") -} - -// WithLogLevel defines which log level to starts containerd with. -// This only makes sense if WithStartDaemon() was set to true. -func WithLogLevel(lvl string) RemoteOption { - return logLevel(lvl) -} - -type logLevel string - -func (l logLevel) Apply(r Remote) error { - if remote, ok := r.(*remote); ok { - remote.Debug.Level = string(l) - return nil - } - return fmt.Errorf("WithDebugLog option not supported for this remote") -} - -// WithDebugAddress defines at which location the debug GRPC connection -// should be made -func WithDebugAddress(addr string) RemoteOption { - return debugAddress(addr) -} - -type debugAddress string - -func (d debugAddress) Apply(r Remote) error { - if remote, ok := r.(*remote); ok { - remote.Debug.Address = string(d) - return nil - } - return fmt.Errorf("WithDebugAddress option not supported for this remote") -} - -// WithMetricsAddress defines at which location the debug GRPC connection -// should be made -func WithMetricsAddress(addr string) RemoteOption { - return metricsAddress(addr) -} - -type metricsAddress string - -func (m metricsAddress) Apply(r Remote) error { - if remote, ok := r.(*remote); ok { - remote.Metrics.Address = string(m) - return nil - } - return fmt.Errorf("WithMetricsAddress option not supported for this remote") -} - -// WithSnapshotter defines snapshotter driver should be used -func WithSnapshotter(name string) RemoteOption { - return snapshotter(name) -} - -type snapshotter string - -func (s snapshotter) Apply(r Remote) error { - if remote, ok := r.(*remote); ok { - remote.snapshotter = string(s) - return nil - } - return fmt.Errorf("WithSnapshotter option not supported for this remote") -} - -// WithPlugin allow configuring a containerd plugin -// configuration values passed needs to be quoted if quotes are needed in -// the toml format. -// Setting the config to nil will disable a built-in plugin -func WithPlugin(name string, conf interface{}) RemoteOption { - return pluginConf{ - name: name, - conf: conf, - } -} - -type pluginConf struct { - // Name is the name of the plugin - name string - conf interface{} -} - -func (p pluginConf) Apply(r Remote) error { - if remote, ok := r.(*remote); ok { - remote.pluginConfs.Plugins[p.name] = p.conf - return nil - } - return fmt.Errorf("WithPlugin option not supported for this remote") -} diff --git a/libcontainerd/remote_daemon_options_linux.go b/libcontainerd/remote_daemon_options_linux.go deleted file mode 100644 index a820fb3894..0000000000 --- a/libcontainerd/remote_daemon_options_linux.go +++ /dev/null @@ -1,18 +0,0 @@ -package libcontainerd // import "github.com/docker/docker/libcontainerd" - -import "fmt" - -// WithOOMScore defines the oom_score_adj to set for the containerd process. -func WithOOMScore(score int) RemoteOption { - return oomScore(score) -} - -type oomScore int - -func (o oomScore) Apply(r Remote) error { - if remote, ok := r.(*remote); ok { - remote.OOMScore = int(o) - return nil - } - return fmt.Errorf("WithOOMScore option not supported for this remote") -} diff --git a/libcontainerd/remote_local.go b/libcontainerd/remote_local.go deleted file mode 100644 index 8ea5198b87..0000000000 --- a/libcontainerd/remote_local.go +++ /dev/null @@ -1,59 +0,0 @@ -// +build windows - -package libcontainerd // import "github.com/docker/docker/libcontainerd" - -import ( - "sync" - - "github.com/sirupsen/logrus" -) - -type remote struct { - sync.RWMutex - - logger *logrus.Entry - clients []*client - - // Options - rootDir string - stateDir string -} - -// New creates a fresh instance of libcontainerd remote. -func New(rootDir, stateDir string, options ...RemoteOption) (Remote, error) { - return &remote{ - logger: logrus.WithField("module", "libcontainerd"), - rootDir: rootDir, - stateDir: stateDir, - }, nil -} - -type client struct { - sync.Mutex - - rootDir string - stateDir string - backend Backend - logger *logrus.Entry - eventQ queue - containers map[string]*container -} - -func (r *remote) NewClient(ns string, b Backend) (Client, error) { - c := &client{ - rootDir: r.rootDir, - stateDir: r.stateDir, - backend: b, - logger: r.logger.WithField("namespace", ns), - containers: make(map[string]*container), - } - r.Lock() - r.clients = append(r.clients, c) - r.Unlock() - - return c, nil -} - -func (r *remote) Cleanup() { - // Nothing to do -} diff --git a/libcontainerd/remote_daemon.go b/libcontainerd/supervisor/remote_daemon.go similarity index 55% rename from libcontainerd/remote_daemon.go rename to libcontainerd/supervisor/remote_daemon.go index 07c0ea6bc6..b520d48671 100644 --- a/libcontainerd/remote_daemon.go +++ b/libcontainerd/supervisor/remote_daemon.go @@ -1,6 +1,4 @@ -// +build !windows - -package libcontainerd // import "github.com/docker/docker/libcontainerd" +package supervisor // import "github.com/docker/docker/libcontainerd/supervisor" import ( "context" @@ -13,7 +11,6 @@ import ( "strconv" "strings" "sync" - "syscall" "time" "github.com/BurntSushi/toml" @@ -28,6 +25,7 @@ const ( maxConnectionRetryCount = 3 healthCheckTimeout = 3 * time.Second shutdownTimeout = 15 * time.Second + startupTimeout = 15 * time.Second configFile = "containerd.toml" binaryName = "docker-containerd" pidFile = "docker-containerd.pid" @@ -44,28 +42,26 @@ type remote struct { daemonPid int logger *logrus.Entry - daemonWaitCh chan struct{} - clients []*client - shutdownContext context.Context - shutdownCancel context.CancelFunc - shutdown bool + daemonWaitCh chan struct{} + daemonStartCh chan struct{} + daemonStopCh chan struct{} - // Options - startDaemon bool rootDir string stateDir string - snapshotter string pluginConfs pluginConfigs } -// New creates a fresh instance of libcontainerd remote. -func New(rootDir, stateDir string, options ...RemoteOption) (rem Remote, err error) { - defer func() { - if err != nil { - err = errors.Wrap(err, "Failed to connect to containerd") - } - }() +// Daemon represents a running containerd daemon +type Daemon interface { + WaitTimeout(time.Duration) error + Address() string +} +// DaemonOpt allows to configure parameters of container daemons +type DaemonOpt func(c *remote) error + +// Start starts a containerd daemon and monitors it +func Start(ctx context.Context, rootDir, stateDir string, opts ...DaemonOpt) (Daemon, error) { r := &remote{ rootDir: rootDir, stateDir: stateDir, @@ -73,86 +69,47 @@ func New(rootDir, stateDir string, options ...RemoteOption) (rem Remote, err err Root: filepath.Join(rootDir, "daemon"), State: filepath.Join(stateDir, "daemon"), }, - pluginConfs: pluginConfigs{make(map[string]interface{})}, - daemonPid: -1, - logger: logrus.WithField("module", "libcontainerd"), + pluginConfs: pluginConfigs{make(map[string]interface{})}, + daemonPid: -1, + logger: logrus.WithField("module", "libcontainerd"), + daemonStartCh: make(chan struct{}), + daemonStopCh: make(chan struct{}), } - r.shutdownContext, r.shutdownCancel = context.WithCancel(context.Background()) - rem = r - for _, option := range options { - if err = option.Apply(r); err != nil { - return + for _, opt := range opts { + if err := opt(r); err != nil { + return nil, err } } r.setDefaults() - if err = system.MkdirAll(stateDir, 0700, ""); err != nil { - return + if err := system.MkdirAll(stateDir, 0700, ""); err != nil { + return nil, err } - if r.startDaemon { - os.Remove(r.GRPC.Address) - if err = r.startContainerd(); err != nil { - return - } - defer func() { - if err != nil { - r.Cleanup() - } - }() - } + go r.monitorDaemon(ctx) - // This connection is just used to monitor the connection - client, err := containerd.New(r.GRPC.Address) - if err != nil { - return + select { + case <-time.After(startupTimeout): + return nil, errors.New("timeout waiting for containerd to start") + case <-r.daemonStartCh: } - if _, err := client.Version(context.Background()); err != nil { - system.KillProcess(r.daemonPid) - return nil, errors.Wrapf(err, "unable to get containerd version") - } - - go r.monitorConnection(client) return r, nil } - -func (r *remote) NewClient(ns string, b Backend) (Client, error) { - c := &client{ - stateDir: r.stateDir, - logger: r.logger.WithField("namespace", ns), - namespace: ns, - backend: b, - containers: make(map[string]*container), +func (r *remote) WaitTimeout(d time.Duration) error { + select { + case <-time.After(d): + return errors.New("timeout waiting for containerd to stop") + case <-r.daemonStopCh: } - rclient, err := containerd.New(r.GRPC.Address, containerd.WithDefaultNamespace(ns)) - if err != nil { - return nil, err - } - c.remote = rclient - - go c.processEventStream(r.shutdownContext) - - r.Lock() - r.clients = append(r.clients, c) - r.Unlock() - return c, nil + return nil } -func (r *remote) Cleanup() { - if r.daemonPid != -1 { - r.shutdownCancel() - r.stopDaemon() - } - - // cleanup some files - os.Remove(filepath.Join(r.stateDir, pidFile)) - - r.platformCleanup() +func (r *remote) Address() string { + return r.GRPC.Address } - func (r *remote) getContainerdPid() (int, error) { pidFile := filepath.Join(r.stateDir, pidFile) f, err := os.OpenFile(pidFile, os.O_RDWR, 0600) @@ -265,85 +222,90 @@ func (r *remote) startContainerd() error { return nil } -func (r *remote) monitorConnection(monitor *containerd.Client) { - var transientFailureCount = 0 +func (r *remote) monitorDaemon(ctx context.Context) { + var ( + transientFailureCount = 0 + client *containerd.Client + err error + delay <-chan time.Time + started bool + ) + + defer func() { + if r.daemonPid != -1 { + r.stopDaemon() + } + + // cleanup some files + os.Remove(filepath.Join(r.stateDir, pidFile)) + + r.platformCleanup() + + close(r.daemonStopCh) + }() for { select { - case <-r.shutdownContext.Done(): + case <-ctx.Done(): r.logger.Info("stopping healthcheck following graceful shutdown") - monitor.Close() + if client != nil { + client.Close() + } return - case <-time.After(500 * time.Millisecond): + case <-delay: + default: } - ctx, cancel := context.WithTimeout(r.shutdownContext, healthCheckTimeout) - _, err := monitor.IsServing(ctx) + if r.daemonPid == -1 { + if r.daemonWaitCh != nil { + <-r.daemonWaitCh + } + + os.RemoveAll(r.GRPC.Address) + if err := r.startContainerd(); err != nil { + r.logger.WithError(err).Error("failed starting containerd") + delay = time.After(50 * time.Millisecond) + continue + } + + client, err = containerd.New(r.GRPC.Address) + if err != nil { + r.logger.WithError(err).Error("failed connecting to containerd") + delay = time.After(100 * time.Millisecond) + continue + } + } + + tctx, cancel := context.WithTimeout(ctx, healthCheckTimeout) + _, err := client.IsServing(tctx) cancel() if err == nil { - transientFailureCount = 0 - continue - } + if !started { + close(r.daemonStartCh) + started = true + } - select { - case <-r.shutdownContext.Done(): - r.logger.Info("stopping healthcheck following graceful shutdown") - monitor.Close() - return - default: + transientFailureCount = 0 + delay = time.After(500 * time.Millisecond) + continue } r.logger.WithError(err).WithField("binary", binaryName).Debug("daemon is not responding") - if r.daemonPid == -1 { - continue - } - transientFailureCount++ if transientFailureCount < maxConnectionRetryCount || system.IsProcessAlive(r.daemonPid) { + delay = time.After(time.Duration(transientFailureCount) * 200 * time.Millisecond) continue } - transientFailureCount = 0 if system.IsProcessAlive(r.daemonPid) { r.logger.WithField("pid", r.daemonPid).Info("killing and restarting containerd") - // Try to get a stack trace - syscall.Kill(r.daemonPid, syscall.SIGUSR1) - <-time.After(100 * time.Millisecond) - system.KillProcess(r.daemonPid) - } - if r.daemonWaitCh != nil { - <-r.daemonWaitCh + r.killDaemon() } - os.Remove(r.GRPC.Address) - if err := r.startContainerd(); err != nil { - r.logger.WithError(err).Error("failed restarting containerd") - continue - } - - if err := monitor.Reconnect(); err != nil { - r.logger.WithError(err).Error("failed connect to containerd") - continue - } - - var wg sync.WaitGroup - - for _, c := range r.clients { - wg.Add(1) - - go func(c *client) { - defer wg.Done() - c.logger.WithField("namespace", c.namespace).Debug("creating new containerd remote client") - if err := c.reconnect(); err != nil { - r.logger.WithError(err).Error("failed to connect to containerd") - // TODO: Better way to handle this? - // This *shouldn't* happen, but this could wind up where the daemon - // is not able to communicate with an eventually up containerd - } - }(c) - - wg.Wait() - } + client.Close() + r.daemonPid = -1 + delay = nil + transientFailureCount = 0 } } diff --git a/libcontainerd/remote_daemon_linux.go b/libcontainerd/supervisor/remote_daemon_linux.go similarity index 84% rename from libcontainerd/remote_daemon_linux.go rename to libcontainerd/supervisor/remote_daemon_linux.go index e3d9640cea..1ea91d2b5d 100644 --- a/libcontainerd/remote_daemon_linux.go +++ b/libcontainerd/supervisor/remote_daemon_linux.go @@ -1,4 +1,4 @@ -package libcontainerd // import "github.com/docker/docker/libcontainerd" +package supervisor // import "github.com/docker/docker/libcontainerd/supervisor" import ( "os" @@ -38,10 +38,6 @@ func (r *remote) setDefaults() { delete(r.pluginConfs.Plugins, key) } } - - if r.snapshotter == "" { - r.snapshotter = "overlay" - } } func (r *remote) stopDaemon() { @@ -61,6 +57,13 @@ func (r *remote) stopDaemon() { } } +func (r *remote) killDaemon() { + // Try to get a stack trace + syscall.Kill(r.daemonPid, syscall.SIGUSR1) + <-time.After(100 * time.Millisecond) + system.KillProcess(r.daemonPid) +} + func (r *remote) platformCleanup() { os.Remove(filepath.Join(r.stateDir, sockFile)) } diff --git a/libcontainerd/supervisor/remote_daemon_options.go b/libcontainerd/supervisor/remote_daemon_options.go new file mode 100644 index 0000000000..4c9387d6ce --- /dev/null +++ b/libcontainerd/supervisor/remote_daemon_options.go @@ -0,0 +1,55 @@ +package supervisor // import "github.com/docker/docker/libcontainerd/supervisor" + +// WithRemoteAddr sets the external containerd socket to connect to. +func WithRemoteAddr(addr string) DaemonOpt { + return func(r *remote) error { + r.GRPC.Address = addr + return nil + } +} + +// WithRemoteAddrUser sets the uid and gid to create the RPC address with +func WithRemoteAddrUser(uid, gid int) DaemonOpt { + return func(r *remote) error { + r.GRPC.UID = uid + r.GRPC.GID = gid + return nil + } +} + +// WithLogLevel defines which log level to starts containerd with. +// This only makes sense if WithStartDaemon() was set to true. +func WithLogLevel(lvl string) DaemonOpt { + return func(r *remote) error { + r.Debug.Level = lvl + return nil + } +} + +// WithDebugAddress defines at which location the debug GRPC connection +// should be made +func WithDebugAddress(addr string) DaemonOpt { + return func(r *remote) error { + r.Debug.Address = addr + return nil + } +} + +// WithMetricsAddress defines at which location the debug GRPC connection +// should be made +func WithMetricsAddress(addr string) DaemonOpt { + return func(r *remote) error { + r.Metrics.Address = addr + return nil + } +} + +// WithPlugin allow configuring a containerd plugin +// configuration values passed needs to be quoted if quotes are needed in +// the toml format. +func WithPlugin(name string, conf interface{}) DaemonOpt { + return func(r *remote) error { + r.pluginConfs.Plugins[name] = conf + return nil + } +} diff --git a/libcontainerd/supervisor/remote_daemon_options_linux.go b/libcontainerd/supervisor/remote_daemon_options_linux.go new file mode 100644 index 0000000000..14511dbf93 --- /dev/null +++ b/libcontainerd/supervisor/remote_daemon_options_linux.go @@ -0,0 +1,9 @@ +package supervisor // import "github.com/docker/docker/libcontainerd/supervisor" + +// WithOOMScore defines the oom_score_adj to set for the containerd process. +func WithOOMScore(score int) DaemonOpt { + return func(r *remote) error { + r.OOMScore = score + return nil + } +} diff --git a/libcontainerd/remote_daemon_windows.go b/libcontainerd/supervisor/remote_daemon_windows.go similarity index 79% rename from libcontainerd/remote_daemon_windows.go rename to libcontainerd/supervisor/remote_daemon_windows.go index 69cd1ac30c..bcdc9529e0 100644 --- a/libcontainerd/remote_daemon_windows.go +++ b/libcontainerd/supervisor/remote_daemon_windows.go @@ -1,9 +1,9 @@ -// +build remote_daemon - -package libcontainerd // import "github.com/docker/docker/libcontainerd" +package supervisor // import "github.com/docker/docker/libcontainerd/supervisor" import ( "os" + + "github.com/docker/docker/pkg/system" ) const ( @@ -18,9 +18,6 @@ func (r *remote) setDefaults() { if r.Debug.Address == "" { r.Debug.Address = debugPipeName } - if r.snapshotter == "" { - r.snapshotter = "naive" // TODO(mlaventure): switch to "windows" once implemented - } } func (r *remote) stopDaemon() { @@ -42,6 +39,10 @@ func (r *remote) stopDaemon() { } } +func (r *remote) killDaemon() { + system.KillProcess(r.daemonPid) +} + func (r *remote) platformCleanup() { // Nothing to do } diff --git a/libcontainerd/utils_linux.go b/libcontainerd/supervisor/utils_linux.go similarity index 74% rename from libcontainerd/utils_linux.go rename to libcontainerd/supervisor/utils_linux.go index ce17d1963d..32ac27f327 100644 --- a/libcontainerd/utils_linux.go +++ b/libcontainerd/supervisor/utils_linux.go @@ -1,4 +1,4 @@ -package libcontainerd // import "github.com/docker/docker/libcontainerd" +package supervisor // import "github.com/docker/docker/libcontainerd/supervisor" import "syscall" diff --git a/libcontainerd/supervisor/utils_windows.go b/libcontainerd/supervisor/utils_windows.go new file mode 100644 index 0000000000..987d32bd23 --- /dev/null +++ b/libcontainerd/supervisor/utils_windows.go @@ -0,0 +1,9 @@ +package supervisor // import "github.com/docker/docker/libcontainerd/supervisor" + +import "syscall" + +// containerdSysProcAttr returns the SysProcAttr to use when exec'ing +// containerd +func containerdSysProcAttr() *syscall.SysProcAttr { + return nil +} diff --git a/libcontainerd/types.go b/libcontainerd/types.go index 96ffbe2676..c4de5e674d 100644 --- a/libcontainerd/types.go +++ b/libcontainerd/types.go @@ -46,23 +46,6 @@ const ( StatusUnknown Status = "unknown" ) -// Remote on Linux defines the accesspoint to the containerd grpc API. -// Remote on Windows is largely an unimplemented interface as there is -// no remote containerd. -type Remote interface { - // Client returns a new Client instance connected with given Backend. - NewClient(namespace string, backend Backend) (Client, error) - // Cleanup stops containerd if it was started by libcontainerd. - // Note this is not used on Windows as there is no remote containerd. - Cleanup() -} - -// RemoteOption allows to configure parameters of remotes. -// This is unused on Windows. -type RemoteOption interface { - Apply(Remote) error -} - // EventInfo contains the event info type EventInfo struct { ContainerID string diff --git a/libcontainerd/utils_windows.go b/libcontainerd/utils_windows.go index fbf243d4f9..aabb9aeaaa 100644 --- a/libcontainerd/utils_windows.go +++ b/libcontainerd/utils_windows.go @@ -3,8 +3,6 @@ package libcontainerd // import "github.com/docker/docker/libcontainerd" import ( "strings" - "syscall" - opengcs "github.com/Microsoft/opengcs/client" ) @@ -38,9 +36,3 @@ func (c *container) debugGCS() { } cfg.DebugGCS() } - -// containerdSysProcAttr returns the SysProcAttr to use when exec'ing -// containerd -func containerdSysProcAttr() *syscall.SysProcAttr { - return nil -} diff --git a/plugin/executor/containerd/containerd.go b/plugin/executor/containerd/containerd.go index 8f1c8a4a19..a3401dce79 100644 --- a/plugin/executor/containerd/containerd.go +++ b/plugin/executor/containerd/containerd.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/containerd/containerd" "github.com/containerd/containerd/cio" "github.com/containerd/containerd/runtime/linux/runctypes" "github.com/docker/docker/errdefs" @@ -16,8 +17,8 @@ import ( "github.com/sirupsen/logrus" ) -// pluginNamespace is the name used for the plugins namespace -const pluginNamespace = "plugins.moby" +// PluginNamespace is the name used for the plugins namespace +const PluginNamespace = "plugins.moby" // ExitHandler represents an object that is called when the exit event is received from containerd type ExitHandler interface { @@ -38,12 +39,13 @@ type Client interface { } // New creates a new containerd plugin executor -func New(rootDir string, remote libcontainerd.Remote, exitHandler ExitHandler) (*Executor, error) { +func New(ctx context.Context, rootDir string, cli *containerd.Client, exitHandler ExitHandler) (*Executor, error) { e := &Executor{ rootDir: rootDir, exitHandler: exitHandler, } - client, err := remote.NewClient(pluginNamespace, e) + + client, err := libcontainerd.NewClient(ctx, cli, rootDir, PluginNamespace, e) if err != nil { return nil, errors.Wrap(err, "error creating containerd exec client") }