1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/plugin/manager.go
Anusha Ragunathan 57499fa62e When handling plugin exit, lookup plugins only during daemon shutdown.
The main intent of handling plugin exit is for graceful shutdown
of plugins during daemon shutdown. So avoid plugin lookup during
plugin exits caused by other reasons (eg. force remove)

Signed-off-by: Anusha Ragunathan <anusha@docker.com>
2016-08-15 14:46:02 -07:00

459 lines
11 KiB
Go

// +build experimental
package plugin
import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/plugins"
"github.com/docker/docker/reference"
"github.com/docker/docker/registry"
"github.com/docker/docker/restartmanager"
"github.com/docker/engine-api/types"
)
const defaultPluginRuntimeDestination = "/run/docker/plugins"
var manager *Manager
// ErrNotFound indicates that a plugin was not found locally.
type ErrNotFound string
func (name ErrNotFound) Error() string { return fmt.Sprintf("plugin %q not found", string(name)) }
// ErrInadequateCapability indicates that a plugin was found but did not have the requested capability.
type ErrInadequateCapability struct {
name string
capability string
}
func (e ErrInadequateCapability) Error() string {
return fmt.Sprintf("plugin %q found, but not with %q capability", e.name, e.capability)
}
type plugin struct {
//sync.RWMutex TODO
PluginObj types.Plugin `json:"plugin"`
client *plugins.Client
restartManager restartmanager.RestartManager
runtimeSourcePath string
exitChan chan bool
}
func (p *plugin) Client() *plugins.Client {
return p.client
}
// IsLegacy returns true for legacy plugins and false otherwise.
func (p *plugin) IsLegacy() bool {
return false
}
func (p *plugin) Name() string {
name := p.PluginObj.Name
if len(p.PluginObj.Tag) > 0 {
// TODO: this feels hacky, maybe we should be storing the distribution reference rather than splitting these
name += ":" + p.PluginObj.Tag
}
return name
}
func (pm *Manager) newPlugin(ref reference.Named, id string) *plugin {
p := &plugin{
PluginObj: types.Plugin{
Name: ref.Name(),
ID: id,
},
runtimeSourcePath: filepath.Join(pm.runRoot, id),
}
if ref, ok := ref.(reference.NamedTagged); ok {
p.PluginObj.Tag = ref.Tag()
}
return p
}
func (pm *Manager) restorePlugin(p *plugin) error {
p.runtimeSourcePath = filepath.Join(pm.runRoot, p.PluginObj.ID)
if p.PluginObj.Active {
return pm.restore(p)
}
return nil
}
type pluginMap map[string]*plugin
type eventLogger func(id, name, action string)
// Manager controls the plugin subsystem.
type Manager struct {
sync.RWMutex
libRoot string
runRoot string
plugins pluginMap // TODO: figure out why save() doesn't json encode *plugin object
nameToID map[string]string
handlers map[string]func(string, *plugins.Client)
containerdClient libcontainerd.Client
registryService registry.Service
handleLegacy bool
liveRestore bool
shutdown bool
pluginEventLogger eventLogger
}
// GetManager returns the singleton plugin Manager
func GetManager() *Manager {
return manager
}
// Init (was NewManager) instantiates the singleton Manager.
// TODO: revert this to NewManager once we get rid of all the singletons.
func Init(root string, remote libcontainerd.Remote, rs registry.Service, liveRestore bool, evL eventLogger) (err error) {
if manager != nil {
return nil
}
root = filepath.Join(root, "plugins")
manager = &Manager{
libRoot: root,
runRoot: "/run/docker",
plugins: make(map[string]*plugin),
nameToID: make(map[string]string),
handlers: make(map[string]func(string, *plugins.Client)),
registryService: rs,
handleLegacy: true,
liveRestore: liveRestore,
pluginEventLogger: evL,
}
if err := os.MkdirAll(manager.runRoot, 0700); err != nil {
return err
}
manager.containerdClient, err = remote.Client(manager)
if err != nil {
return err
}
if err := manager.init(); err != nil {
return err
}
return nil
}
// Handle sets a callback for a given capability. The callback will be called for every plugin with a given capability.
// TODO: append instead of set?
func Handle(capability string, callback func(string, *plugins.Client)) {
pluginType := fmt.Sprintf("docker.%s/1", strings.ToLower(capability))
manager.handlers[pluginType] = callback
if manager.handleLegacy {
plugins.Handle(capability, callback)
}
}
func (pm *Manager) get(name string) (*plugin, error) {
pm.RLock()
defer pm.RUnlock()
id, nameOk := pm.nameToID[name]
if !nameOk {
return nil, ErrNotFound(name)
}
p, idOk := pm.plugins[id]
if !idOk {
return nil, ErrNotFound(name)
}
return p, nil
}
// FindWithCapability returns a list of plugins matching the given capability.
func FindWithCapability(capability string) ([]Plugin, error) {
handleLegacy := true
result := make([]Plugin, 0, 1)
if manager != nil {
handleLegacy = manager.handleLegacy
manager.RLock()
defer manager.RUnlock()
pluginLoop:
for _, p := range manager.plugins {
for _, typ := range p.PluginObj.Manifest.Interface.Types {
if typ.Capability != capability || typ.Prefix != "docker" {
continue pluginLoop
}
}
result = append(result, p)
}
}
if handleLegacy {
pl, err := plugins.GetAll(capability)
if err != nil {
return nil, fmt.Errorf("legacy plugin: %v", err)
}
for _, p := range pl {
if _, ok := manager.nameToID[p.Name()]; !ok {
result = append(result, p)
}
}
}
return result, nil
}
// LookupWithCapability returns a plugin matching the given name and capability.
func LookupWithCapability(name, capability string) (Plugin, error) {
var (
p *plugin
err error
)
handleLegacy := true
if manager != nil {
fullName := name
if named, err := reference.ParseNamed(fullName); err == nil { // FIXME: validate
if reference.IsNameOnly(named) {
named = reference.WithDefaultTag(named)
}
ref, ok := named.(reference.NamedTagged)
if !ok {
return nil, fmt.Errorf("invalid name: %s", named.String())
}
fullName = ref.String()
}
p, err = manager.get(fullName)
if err != nil {
if _, ok := err.(ErrNotFound); !ok {
return nil, err
}
handleLegacy = manager.handleLegacy
} else {
handleLegacy = false
}
}
if handleLegacy {
p, err := plugins.Get(name, capability)
if err != nil {
return nil, fmt.Errorf("legacy plugin: %v", err)
}
return p, nil
} else if err != nil {
return nil, err
}
capability = strings.ToLower(capability)
for _, typ := range p.PluginObj.Manifest.Interface.Types {
if typ.Capability == capability && typ.Prefix == "docker" {
return p, nil
}
}
return nil, ErrInadequateCapability{name, capability}
}
// StateChanged updates plugin internals using libcontainerd events.
func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error {
logrus.Debugf("plugin state changed %s %#v", id, e)
switch e.State {
case libcontainerd.StateExit:
var shutdown bool
pm.RLock()
shutdown = pm.shutdown
pm.RUnlock()
if shutdown {
pm.RLock()
p, idOk := pm.plugins[id]
pm.RUnlock()
if !idOk {
return ErrNotFound(id)
}
close(p.exitChan)
}
}
return nil
}
// AttachStreams attaches io streams to the plugin
func (pm *Manager) AttachStreams(id string, iop libcontainerd.IOPipe) error {
iop.Stdin.Close()
logger := logrus.New()
logger.Hooks.Add(logHook{id})
// TODO: cache writer per id
w := logger.Writer()
go func() {
io.Copy(w, iop.Stdout)
}()
go func() {
// TODO: update logrus and use logger.WriterLevel
io.Copy(w, iop.Stderr)
}()
return nil
}
func (pm *Manager) init() error {
dt, err := os.Open(filepath.Join(pm.libRoot, "plugins.json"))
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer dt.Close()
if err := json.NewDecoder(dt).Decode(&pm.plugins); err != nil {
return err
}
var group sync.WaitGroup
group.Add(len(pm.plugins))
for _, p := range pm.plugins {
go func(p *plugin) {
defer group.Done()
if err := pm.restorePlugin(p); err != nil {
logrus.Errorf("failed to restore plugin '%s': %s", p.Name(), err)
return
}
pm.Lock()
pm.nameToID[p.Name()] = p.PluginObj.ID
requiresManualRestore := !pm.liveRestore && p.PluginObj.Active
pm.Unlock()
if requiresManualRestore {
// if liveRestore is not enabled, the plugin will be stopped now so we should enable it
if err := pm.enable(p, true); err != nil {
logrus.Errorf("failed to enable plugin '%s': %s", p.Name(), err)
}
}
}(p)
}
group.Wait()
return pm.save()
}
func (pm *Manager) initPlugin(p *plugin) error {
dt, err := os.Open(filepath.Join(pm.libRoot, p.PluginObj.ID, "manifest.json"))
if err != nil {
return err
}
err = json.NewDecoder(dt).Decode(&p.PluginObj.Manifest)
dt.Close()
if err != nil {
return err
}
p.PluginObj.Config.Mounts = make([]types.PluginMount, len(p.PluginObj.Manifest.Mounts))
for i, mount := range p.PluginObj.Manifest.Mounts {
p.PluginObj.Config.Mounts[i] = mount
}
p.PluginObj.Config.Env = make([]string, 0, len(p.PluginObj.Manifest.Env))
for _, env := range p.PluginObj.Manifest.Env {
if env.Value != nil {
p.PluginObj.Config.Env = append(p.PluginObj.Config.Env, fmt.Sprintf("%s=%s", env.Name, *env.Value))
}
}
copy(p.PluginObj.Config.Args, p.PluginObj.Manifest.Args.Value)
f, err := os.Create(filepath.Join(pm.libRoot, p.PluginObj.ID, "plugin-config.json"))
if err != nil {
return err
}
err = json.NewEncoder(f).Encode(&p.PluginObj.Config)
f.Close()
return err
}
func (pm *Manager) remove(p *plugin, force bool) error {
if p.PluginObj.Active {
if !force {
return fmt.Errorf("plugin %s is active", p.Name())
}
if err := pm.disable(p); err != nil {
logrus.Errorf("failed to disable plugin '%s': %s", p.Name(), err)
}
}
pm.Lock() // fixme: lock single record
defer pm.Unlock()
delete(pm.plugins, p.PluginObj.ID)
delete(pm.nameToID, p.Name())
pm.save()
return os.RemoveAll(filepath.Join(pm.libRoot, p.PluginObj.ID))
}
func (pm *Manager) set(p *plugin, args []string) error {
m := make(map[string]string, len(args))
for _, arg := range args {
i := strings.Index(arg, "=")
if i < 0 {
return fmt.Errorf("no equal sign '=' found in %s", arg)
}
m[arg[:i]] = arg[i+1:]
}
return errors.New("not implemented")
}
// fixme: not safe
func (pm *Manager) save() error {
filePath := filepath.Join(pm.libRoot, "plugins.json")
jsonData, err := json.Marshal(pm.plugins)
if err != nil {
logrus.Debugf("failure in json.Marshal: %v", err)
return err
}
ioutils.AtomicWriteFile(filePath, jsonData, 0600)
return nil
}
type logHook struct{ id string }
func (logHook) Levels() []logrus.Level {
return logrus.AllLevels
}
func (l logHook) Fire(entry *logrus.Entry) error {
entry.Data = logrus.Fields{"plugin": l.id}
return nil
}
func computePrivileges(m *types.PluginManifest) types.PluginPrivileges {
var privileges types.PluginPrivileges
if m.Network.Type != "null" && m.Network.Type != "bridge" {
privileges = append(privileges, types.PluginPrivilege{
Name: "network",
Description: "",
Value: []string{m.Network.Type},
})
}
for _, mount := range m.Mounts {
if mount.Source != nil {
privileges = append(privileges, types.PluginPrivilege{
Name: "mount",
Description: "",
Value: []string{*mount.Source},
})
}
}
for _, device := range m.Devices {
if device.Path != nil {
privileges = append(privileges, types.PluginPrivilege{
Name: "device",
Description: "",
Value: []string{*device.Path},
})
}
}
if len(m.Capabilities) > 0 {
privileges = append(privileges, types.PluginPrivilege{
Name: "capabilities",
Description: "",
Value: m.Capabilities,
})
}
return privileges
}