1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #34117 from cpuguy83/decouple_plugin_manager_from_executor

Decouple plugin manager from libcontainerd package
This commit is contained in:
Sebastiaan van Stijn 2017-09-19 21:07:57 +02:00 committed by GitHub
commit b6b85da657
7 changed files with 298 additions and 253 deletions

View file

@ -48,6 +48,7 @@ import (
"github.com/docker/docker/pkg/system"
"github.com/docker/docker/pkg/truncindex"
"github.com/docker/docker/plugin"
pluginexec "github.com/docker/docker/plugin/executor/containerd"
refstore "github.com/docker/docker/reference"
"github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
@ -646,12 +647,16 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe
}
registerMetricsPluginCallback(d.PluginStore, metricsSockPath)
createPluginExec := func(m *plugin.Manager) (plugin.Executor, error) {
return pluginexec.New(containerdRemote, m)
}
// Plugin system initialization should happen before restore. Do not change order.
d.pluginManager, err = plugin.NewManager(plugin.ManagerConfig{
Root: filepath.Join(config.Root, "plugins"),
ExecRoot: getPluginExecRoot(config.Root),
Store: d.PluginStore,
Executor: containerdRemote,
CreateExecutor: createPluginExec,
RegistryService: registryService,
LiveRestoreEnabled: config.LiveRestoreEnabled,
LogPluginEvent: d.LogPluginEvent, // todo: make private

View file

@ -1,9 +1,19 @@
package plugin
import (
"encoding/json"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/plugin"
"github.com/docker/docker/registry"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
@ -32,3 +42,142 @@ func WithBinary(bin string) CreateOpt {
type CreateClient interface {
PluginCreate(context.Context, io.Reader, types.PluginCreateOptions) error
}
// Create creates a new plugin with the specified name
func Create(ctx context.Context, c CreateClient, name string, opts ...CreateOpt) error {
tmpDir, err := ioutil.TempDir("", "create-test-plugin")
if err != nil {
return err
}
defer os.RemoveAll(tmpDir)
tar, err := makePluginBundle(tmpDir, opts...)
if err != nil {
return err
}
defer tar.Close()
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
return c.PluginCreate(ctx, tar, types.PluginCreateOptions{RepoName: name})
}
// CreateInRegistry makes a plugin (locally) and pushes it to a registry.
// This does not use a dockerd instance to create or push the plugin.
// If you just want to create a plugin in some daemon, use `Create`.
//
// This can be useful when testing plugins on swarm where you don't really want
// the plugin to exist on any of the daemons (immediately) and there needs to be
// some way to distribute the plugin.
func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig, opts ...CreateOpt) error {
tmpDir, err := ioutil.TempDir("", "create-test-plugin-local")
if err != nil {
return err
}
defer os.RemoveAll(tmpDir)
inPath := filepath.Join(tmpDir, "plugin")
if err := os.MkdirAll(inPath, 0755); err != nil {
return errors.Wrap(err, "error creating plugin root")
}
tar, err := makePluginBundle(inPath, opts...)
if err != nil {
return err
}
defer tar.Close()
dummyExec := func(m *plugin.Manager) (plugin.Executor, error) {
return nil, nil
}
regService, err := registry.NewService(registry.ServiceOptions{V2Only: true})
if err != nil {
return err
}
managerConfig := plugin.ManagerConfig{
Store: plugin.NewStore(),
RegistryService: regService,
Root: filepath.Join(tmpDir, "root"),
ExecRoot: "/run/docker", // manager init fails if not set
CreateExecutor: dummyExec,
LogPluginEvent: func(id, name, action string) {}, // panics when not set
}
manager, err := plugin.NewManager(managerConfig)
if err != nil {
return errors.Wrap(err, "error creating plugin manager")
}
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err := manager.CreateFromContext(ctx, tar, &types.PluginCreateOptions{RepoName: repo}); err != nil {
return err
}
if auth == nil {
auth = &types.AuthConfig{}
}
err = manager.Push(ctx, repo, nil, auth, ioutil.Discard)
return errors.Wrap(err, "error pushing plugin")
}
func makePluginBundle(inPath string, opts ...CreateOpt) (io.ReadCloser, error) {
p := &types.PluginConfig{
Interface: types.PluginConfigInterface{
Socket: "basic.sock",
Types: []types.PluginInterfaceType{{Capability: "docker.dummy/1.0"}},
},
Entrypoint: []string{"/basic"},
}
cfg := &Config{
PluginConfig: p,
}
for _, o := range opts {
o(cfg)
}
if cfg.binPath == "" {
binPath, err := ensureBasicPluginBin()
if err != nil {
return nil, err
}
cfg.binPath = binPath
}
configJSON, err := json.Marshal(p)
if err != nil {
return nil, err
}
if err := ioutil.WriteFile(filepath.Join(inPath, "config.json"), configJSON, 0644); err != nil {
return nil, err
}
if err := os.MkdirAll(filepath.Join(inPath, "rootfs", filepath.Dir(p.Entrypoint[0])), 0755); err != nil {
return nil, errors.Wrap(err, "error creating plugin rootfs dir")
}
if err := archive.NewDefaultArchiver().CopyFileWithTar(cfg.binPath, filepath.Join(inPath, "rootfs", p.Entrypoint[0])); err != nil {
return nil, errors.Wrap(err, "error copying plugin binary to rootfs path")
}
tar, err := archive.Tar(inPath, archive.Uncompressed)
return tar, errors.Wrap(err, "error making plugin archive")
}
func ensureBasicPluginBin() (string, error) {
name := "docker-basic-plugin"
p, err := exec.LookPath(name)
if err == nil {
return p, nil
}
goBin, err := exec.LookPath("go")
if err != nil {
return "", err
}
installPath := filepath.Join(os.Getenv("GOPATH"), "bin", name)
cmd := exec.Command(goBin, "build", "-o", installPath, "./"+filepath.Join("fixtures", "plugin", "basic"))
cmd.Env = append(cmd.Env, "CGO_ENABLED=0")
if out, err := cmd.CombinedOutput(); err != nil {
return "", errors.Wrapf(err, "error building basic plugin bin: %s", string(out))
}
return installPath, nil
}

View file

@ -1,162 +0,0 @@
package plugin
import (
"encoding/json"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/plugin"
"github.com/docker/docker/registry"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
// Create creates a new plugin with the specified name
func Create(ctx context.Context, c CreateClient, name string, opts ...CreateOpt) error {
tmpDir, err := ioutil.TempDir("", "create-test-plugin")
if err != nil {
return err
}
defer os.RemoveAll(tmpDir)
tar, err := makePluginBundle(tmpDir, opts...)
if err != nil {
return err
}
defer tar.Close()
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
return c.PluginCreate(ctx, tar, types.PluginCreateOptions{RepoName: name})
}
// TODO(@cpuguy83): we really shouldn't have to do this...
// The manager panics on init when `Executor` is not set.
type dummyExecutor struct{}
func (dummyExecutor) Client(libcontainerd.Backend) (libcontainerd.Client, error) { return nil, nil }
func (dummyExecutor) Cleanup() {}
func (dummyExecutor) UpdateOptions(...libcontainerd.RemoteOption) error { return nil }
// CreateInRegistry makes a plugin (locally) and pushes it to a registry.
// This does not use a dockerd instance to create or push the plugin.
// If you just want to create a plugin in some daemon, use `Create`.
//
// This can be useful when testing plugins on swarm where you don't really want
// the plugin to exist on any of the daemons (immediately) and there needs to be
// some way to distribute the plugin.
func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig, opts ...CreateOpt) error {
tmpDir, err := ioutil.TempDir("", "create-test-plugin-local")
if err != nil {
return err
}
defer os.RemoveAll(tmpDir)
inPath := filepath.Join(tmpDir, "plugin")
if err := os.MkdirAll(inPath, 0755); err != nil {
return errors.Wrap(err, "error creating plugin root")
}
tar, err := makePluginBundle(inPath, opts...)
if err != nil {
return err
}
defer tar.Close()
regService, err := registry.NewService(registry.ServiceOptions{V2Only: true})
if err != nil {
return err
}
managerConfig := plugin.ManagerConfig{
Store: plugin.NewStore(),
RegistryService: regService,
Root: filepath.Join(tmpDir, "root"),
ExecRoot: "/run/docker", // manager init fails if not set
Executor: dummyExecutor{},
LogPluginEvent: func(id, name, action string) {}, // panics when not set
}
manager, err := plugin.NewManager(managerConfig)
if err != nil {
return errors.Wrap(err, "error creating plugin manager")
}
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err := manager.CreateFromContext(ctx, tar, &types.PluginCreateOptions{RepoName: repo}); err != nil {
return err
}
if auth == nil {
auth = &types.AuthConfig{}
}
err = manager.Push(ctx, repo, nil, auth, ioutil.Discard)
return errors.Wrap(err, "error pushing plugin")
}
func makePluginBundle(inPath string, opts ...CreateOpt) (io.ReadCloser, error) {
p := &types.PluginConfig{
Interface: types.PluginConfigInterface{
Socket: "basic.sock",
Types: []types.PluginInterfaceType{{Capability: "docker.dummy/1.0"}},
},
Entrypoint: []string{"/basic"},
}
cfg := &Config{
PluginConfig: p,
}
for _, o := range opts {
o(cfg)
}
if cfg.binPath == "" {
binPath, err := ensureBasicPluginBin()
if err != nil {
return nil, err
}
cfg.binPath = binPath
}
configJSON, err := json.Marshal(p)
if err != nil {
return nil, err
}
if err := ioutil.WriteFile(filepath.Join(inPath, "config.json"), configJSON, 0644); err != nil {
return nil, err
}
if err := os.MkdirAll(filepath.Join(inPath, "rootfs", filepath.Dir(p.Entrypoint[0])), 0755); err != nil {
return nil, errors.Wrap(err, "error creating plugin rootfs dir")
}
if err := archive.NewDefaultArchiver().CopyFileWithTar(cfg.binPath, filepath.Join(inPath, "rootfs", p.Entrypoint[0])); err != nil {
return nil, errors.Wrap(err, "error copying plugin binary to rootfs path")
}
tar, err := archive.Tar(inPath, archive.Uncompressed)
return tar, errors.Wrap(err, "error making plugin archive")
}
func ensureBasicPluginBin() (string, error) {
name := "docker-basic-plugin"
p, err := exec.LookPath(name)
if err == nil {
return p, nil
}
goBin, err := exec.LookPath("go")
if err != nil {
return "", err
}
installPath := filepath.Join(os.Getenv("GOPATH"), "bin", name)
cmd := exec.Command(goBin, "build", "-o", installPath, "./"+filepath.Join("fixtures", "plugin", "basic"))
cmd.Env = append(cmd.Env, "CGO_ENABLED=0")
if out, err := cmd.CombinedOutput(); err != nil {
return "", errors.Wrapf(err, "error building basic plugin bin: %s", string(out))
}
return installPath, nil
}

View file

@ -1,19 +0,0 @@
// +build !linux
package plugin
import (
"github.com/docker/docker/api/types"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
// Create is not supported on this platform
func Create(ctx context.Context, c CreateClient, name string, opts ...CreateOpt) error {
return errors.New("not supported on this platform")
}
// CreateInRegistry is not supported on this platform
func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig, opts ...CreateOpt) error {
return errors.New("not supported on this platform")
}

View file

@ -0,0 +1,77 @@
package containerd
import (
"io"
"github.com/docker/docker/libcontainerd"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
// ExitHandler represents an object that is called when the exit event is received from containerd
type ExitHandler interface {
HandleExitEvent(id string) error
}
// New creates a new containerd plugin executor
func New(remote libcontainerd.Remote, exitHandler ExitHandler) (*Executor, error) {
e := &Executor{exitHandler: exitHandler}
client, err := remote.Client(e)
if err != nil {
return nil, errors.Wrap(err, "error creating containerd exec client")
}
e.client = client
return e, nil
}
// Executor is the containerd client implementation of a plugin executor
type Executor struct {
client libcontainerd.Client
exitHandler ExitHandler
}
// Create creates a new container
func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error {
return e.client.Create(id, "", "", spec, attachStreamsFunc(stdout, stderr))
}
// Restore restores a container
func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) error {
return e.client.Restore(id, attachStreamsFunc(stdout, stderr))
}
// IsRunning returns if the container with the given id is running
func (e *Executor) IsRunning(id string) (bool, error) {
pids, err := e.client.GetPidsForContainer(id)
return len(pids) > 0, err
}
// Signal sends the specified signal to the container
func (e *Executor) Signal(id string, signal int) error {
return e.client.Signal(id, signal)
}
// StateChanged handles state changes from containerd
// All events are ignored except the exit event, which is sent of to the stored handler
func (e *Executor) StateChanged(id string, event libcontainerd.StateInfo) error {
switch event.State {
case libcontainerd.StateExit:
return e.exitHandler.HandleExitEvent(id)
}
return nil
}
func attachStreamsFunc(stdout, stderr io.WriteCloser) func(libcontainerd.IOPipe) error {
return func(iop libcontainerd.IOPipe) error {
iop.Stdin.Close()
go func() {
io.Copy(stdout, iop.Stdout)
stdout.Close()
}()
go func() {
io.Copy(stderr, iop.Stderr)
stderr.Close()
}()
return nil
}
}

View file

@ -17,7 +17,6 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/pkg/authorization"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/mount"
@ -26,6 +25,7 @@ import (
"github.com/docker/docker/plugin/v2"
"github.com/docker/docker/registry"
"github.com/opencontainers/go-digest"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -35,6 +35,14 @@ const rootFSFileName = "rootfs"
var validFullID = regexp.MustCompile(`^([a-f0-9]{64})$`)
// Executor is the interface that the plugin manager uses to interact with for starting/stopping plugins
type Executor interface {
Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error
Restore(id string, stdout, stderr io.WriteCloser) error
IsRunning(id string) (bool, error)
Signal(id string, signal int) error
}
func (pm *Manager) restorePlugin(p *v2.Plugin) error {
if p.IsEnabled() {
return pm.restore(p)
@ -47,24 +55,27 @@ type eventLogger func(id, name, action string)
// ManagerConfig defines configuration needed to start new manager.
type ManagerConfig struct {
Store *Store // remove
Executor libcontainerd.Remote
RegistryService registry.Service
LiveRestoreEnabled bool // TODO: remove
LogPluginEvent eventLogger
Root string
ExecRoot string
CreateExecutor ExecutorCreator
AuthzMiddleware *authorization.Middleware
}
// ExecutorCreator is used in the manager config to pass in an `Executor`
type ExecutorCreator func(*Manager) (Executor, error)
// Manager controls the plugin subsystem.
type Manager struct {
config ManagerConfig
mu sync.RWMutex // protects cMap
muGC sync.RWMutex // protects blobstore deletions
cMap map[*v2.Plugin]*controller
containerdClient libcontainerd.Client
blobStore *basicBlobStore
publisher *pubsub.Publisher
executor Executor
}
// controller represents the manager's control on a plugin.
@ -111,10 +122,11 @@ func NewManager(config ManagerConfig) (*Manager, error) {
}
var err error
manager.containerdClient, err = config.Executor.Client(manager) // todo: move to another struct
manager.executor, err = config.CreateExecutor(manager)
if err != nil {
return nil, errors.Wrap(err, "failed to create containerd client")
return nil, err
}
manager.blobStore, err = newBasicBlobStore(filepath.Join(manager.config.Root, "storage/blobs"))
if err != nil {
return nil, err
@ -133,12 +145,9 @@ func (pm *Manager) tmpDir() string {
return filepath.Join(pm.config.Root, "tmp")
}
// StateChanged updates plugin internals using libcontainerd events.
func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error {
logrus.Debugf("plugin state changed %s %#v", id, e)
switch e.State {
case libcontainerd.StateExit:
// HandleExitEvent is called when the executor receives the exit event
// In the future we may change this, but for now all we care about is the exit event.
func (pm *Manager) HandleExitEvent(id string) error {
p, err := pm.config.Store.GetV2Plugin(id)
if err != nil {
return err
@ -167,8 +176,6 @@ func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error {
if restart {
pm.enable(p, c, true)
}
}
return nil
}
@ -333,23 +340,10 @@ func (l logHook) Fire(entry *logrus.Entry) error {
return nil
}
func attachToLog(id string) func(libcontainerd.IOPipe) error {
return func(iop libcontainerd.IOPipe) error {
iop.Stdin.Close()
func makeLoggerStreams(id string) (stdout, stderr io.WriteCloser) {
logger := logrus.New()
logger.Hooks.Add(logHook{id})
// TODO: cache writer per id
w := logger.Writer()
go func() {
io.Copy(w, iop.Stdout)
}()
go func() {
// TODO: update logrus and use logger.WriterLevel
io.Copy(w, iop.Stderr)
}()
return nil
}
return logger.WriterLevel(logrus.InfoLevel), logger.WriterLevel(logrus.ErrorLevel)
}
func validatePrivileges(requiredPrivileges, privileges types.PluginPrivileges) error {

View file

@ -11,7 +11,6 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/daemon/initlayer"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/pkg/containerfs"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/mount"
@ -63,7 +62,8 @@ func (pm *Manager) enable(p *v2.Plugin, c *controller, force bool) error {
return errors.WithStack(err)
}
if err := pm.containerdClient.Create(p.GetID(), "", "", *spec, attachToLog(p.GetID())); err != nil {
stdout, stderr := makeLoggerStreams(p.GetID())
if err := pm.executor.Create(p.GetID(), *spec, stdout, stderr); err != nil {
if p.PropagatedMount != "" {
if err := mount.Unmount(p.PropagatedMount); err != nil {
logrus.Warnf("Could not unmount %s: %v", p.PropagatedMount, err)
@ -83,7 +83,7 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error {
client, err := plugins.NewClientWithTimeout("unix://"+sockAddr, nil, time.Duration(c.timeoutInSecs)*time.Second)
if err != nil {
c.restart = false
shutdownPlugin(p, c, pm.containerdClient)
shutdownPlugin(p, c, pm.executor)
return errors.WithStack(err)
}
@ -109,7 +109,7 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error {
c.restart = false
// While restoring plugins, we need to explicitly set the state to disabled
pm.config.Store.SetState(p, false)
shutdownPlugin(p, c, pm.containerdClient)
shutdownPlugin(p, c, pm.executor)
return err
}
@ -121,13 +121,14 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error {
}
func (pm *Manager) restore(p *v2.Plugin) error {
if err := pm.containerdClient.Restore(p.GetID(), attachToLog(p.GetID())); err != nil {
stdout, stderr := makeLoggerStreams(p.GetID())
if err := pm.executor.Restore(p.GetID(), stdout, stderr); err != nil {
return err
}
if pm.config.LiveRestoreEnabled {
c := &controller{}
if pids, _ := pm.containerdClient.GetPidsForContainer(p.GetID()); len(pids) == 0 {
if isRunning, _ := pm.executor.IsRunning(p.GetID()); !isRunning {
// plugin is not running, so follow normal startup procedure
return pm.enable(p, c, true)
}
@ -143,10 +144,10 @@ func (pm *Manager) restore(p *v2.Plugin) error {
return nil
}
func shutdownPlugin(p *v2.Plugin, c *controller, containerdClient libcontainerd.Client) {
func shutdownPlugin(p *v2.Plugin, c *controller, executor Executor) {
pluginID := p.GetID()
err := containerdClient.Signal(pluginID, int(unix.SIGTERM))
err := executor.Signal(pluginID, int(unix.SIGTERM))
if err != nil {
logrus.Errorf("Sending SIGTERM to plugin failed with error: %v", err)
} else {
@ -155,7 +156,7 @@ func shutdownPlugin(p *v2.Plugin, c *controller, containerdClient libcontainerd.
logrus.Debug("Clean shutdown of plugin")
case <-time.After(time.Second * 10):
logrus.Debug("Force shutdown plugin")
if err := containerdClient.Signal(pluginID, int(unix.SIGKILL)); err != nil {
if err := executor.Signal(pluginID, int(unix.SIGKILL)); err != nil {
logrus.Errorf("Sending SIGKILL to plugin failed with error: %v", err)
}
}
@ -175,7 +176,7 @@ func (pm *Manager) disable(p *v2.Plugin, c *controller) error {
}
c.restart = false
shutdownPlugin(p, c, pm.containerdClient)
shutdownPlugin(p, c, pm.executor)
pm.config.Store.SetState(p, false)
return pm.save(p)
}
@ -192,9 +193,9 @@ func (pm *Manager) Shutdown() {
logrus.Debug("Plugin active when liveRestore is set, skipping shutdown")
continue
}
if pm.containerdClient != nil && p.IsEnabled() {
if pm.executor != nil && p.IsEnabled() {
c.restart = false
shutdownPlugin(p, c, pm.containerdClient)
shutdownPlugin(p, c, pm.executor)
}
}
mount.Unmount(pm.config.Root)