1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/pkg/plugins/plugins.go

341 lines
7.6 KiB
Go
Raw Normal View History

// Package plugins provides structures and helper functions to manage Docker
// plugins.
//
// Docker discovers plugins by looking for them in the plugin directory whenever
// a user or container tries to use one by name. UNIX domain socket files must
// be located under /run/docker/plugins, whereas spec files can be located
// either under /etc/docker/plugins or /usr/lib/docker/plugins. This is handled
// by the Registry interface, which lets you list all plugins or get a plugin by
// its name if it exists.
//
// The plugins need to implement an HTTP server and bind this to the UNIX socket
// or the address specified in the spec files.
// A handshake is send at /Plugin.Activate, and plugins are expected to return
// a Manifest with a list of Docker subsystems which this plugin implements.
//
// In order to use a plugins, you can use the ``Get`` with the name of the
// plugin and the subsystem it implements.
//
// plugin, err := plugins.Get("example", "VolumeDriver")
// if err != nil {
// return fmt.Errorf("Error looking up volume plugin example: %v", err)
// }
package plugins // import "github.com/docker/docker/pkg/plugins"
import (
"errors"
"sync"
"time"
"github.com/docker/go-connections/tlsconfig"
"github.com/sirupsen/logrus"
)
// ProtocolSchemeHTTPV1 is the name of the protocol used for interacting with plugins using this package.
const ProtocolSchemeHTTPV1 = "moby.plugins.http/v1"
var (
// ErrNotImplements is returned if the plugin does not implement the requested driver.
ErrNotImplements = errors.New("Plugin does not implement the requested driver")
)
type plugins struct {
sync.Mutex
plugins map[string]*Plugin
}
type extpointHandlers struct {
sync.RWMutex
extpointHandlers map[string][]func(string, *Client)
}
var (
storage = plugins{plugins: make(map[string]*Plugin)}
handlers = extpointHandlers{extpointHandlers: make(map[string][]func(string, *Client))}
)
// Manifest lists what a plugin implements.
type Manifest struct {
// List of subsystem the plugin implements.
Implements []string
}
// Plugin is the definition of a docker plugin.
type Plugin struct {
// Name of the plugin
name string
// Address of the plugin
Addr string
// TLS configuration of the plugin
TLSConfig *tlsconfig.Options
// Client attached to the plugin
client *Client
// Manifest of the plugin (see above)
Manifest *Manifest `json:"-"`
// wait for activation to finish
activateWait *sync.Cond
// error produced by activation
activateErr error
// keeps track of callback handlers run against this plugin
handlersRun bool
}
// Name returns the name of the plugin.
func (p *Plugin) Name() string {
return p.name
}
// Client returns a ready-to-use plugin client that can be used to communicate with the plugin.
func (p *Plugin) Client() *Client {
return p.client
}
// Protocol returns the protocol name/version used for plugins in this package.
func (p *Plugin) Protocol() string {
return ProtocolSchemeHTTPV1
}
// IsV1 returns true for V1 plugins and false otherwise.
func (p *Plugin) IsV1() bool {
return true
}
// NewLocalPlugin creates a new local plugin.
func NewLocalPlugin(name, addr string) *Plugin {
return &Plugin{
name: name,
Addr: addr,
// TODO: change to nil
TLSConfig: &tlsconfig.Options{InsecureSkipVerify: true},
activateWait: sync.NewCond(&sync.Mutex{}),
}
}
func (p *Plugin) activate() error {
p.activateWait.L.Lock()
if p.activated() {
p.runHandlers()
p.activateWait.L.Unlock()
return p.activateErr
}
p.activateErr = p.activateWithLock()
p.runHandlers()
p.activateWait.L.Unlock()
p.activateWait.Broadcast()
return p.activateErr
}
// runHandlers runs the registered handlers for the implemented plugin types
// This should only be run after activation, and while the activation lock is held.
func (p *Plugin) runHandlers() {
if !p.activated() {
return
}
handlers.RLock()
if !p.handlersRun {
for _, iface := range p.Manifest.Implements {
hdlrs, handled := handlers.extpointHandlers[iface]
if !handled {
continue
}
for _, handler := range hdlrs {
handler(p.name, p.client)
}
}
p.handlersRun = true
}
handlers.RUnlock()
}
// activated returns if the plugin has already been activated.
// This should only be called with the activation lock held
func (p *Plugin) activated() bool {
return p.Manifest != nil
}
func (p *Plugin) activateWithLock() error {
c, err := NewClient(p.Addr, p.TLSConfig)
if err != nil {
return err
}
p.client = c
m := new(Manifest)
if err = p.client.Call("Plugin.Activate", nil, m); err != nil {
return err
}
p.Manifest = m
return nil
}
func (p *Plugin) waitActive() error {
p.activateWait.L.Lock()
for !p.activated() && p.activateErr == nil {
p.activateWait.Wait()
}
p.activateWait.L.Unlock()
return p.activateErr
}
func (p *Plugin) implements(kind string) bool {
if p.Manifest == nil {
return false
}
for _, driver := range p.Manifest.Implements {
if driver == kind {
return true
}
}
return false
}
func load(name string) (*Plugin, error) {
return loadWithRetry(name, true)
}
func loadWithRetry(name string, retry bool) (*Plugin, error) {
registry := newLocalRegistry()
start := time.Now()
var retries int
for {
pl, err := registry.Plugin(name)
if err != nil {
if !retry {
return nil, err
}
timeOff := backoff(retries)
if abort(start, timeOff) {
return nil, err
}
retries++
logrus.Warnf("Unable to locate plugin: %s, retrying in %v", name, timeOff)
time.Sleep(timeOff)
continue
}
storage.Lock()
if pl, exists := storage.plugins[name]; exists {
storage.Unlock()
return pl, pl.activate()
}
storage.plugins[name] = pl
storage.Unlock()
err = pl.activate()
if err != nil {
storage.Lock()
delete(storage.plugins, name)
storage.Unlock()
}
return pl, err
}
}
func get(name string) (*Plugin, error) {
storage.Lock()
pl, ok := storage.plugins[name]
storage.Unlock()
if ok {
return pl, pl.activate()
}
return load(name)
}
// Get returns the plugin given the specified name and requested implementation.
func Get(name, imp string) (*Plugin, error) {
if name == "" {
return nil, errors.New("Unable to find plugin without name")
}
pl, err := get(name)
if err != nil {
return nil, err
}
if err := pl.waitActive(); err == nil && pl.implements(imp) {
logrus.Debugf("%s implements: %s", name, imp)
return pl, nil
}
return nil, ErrNotImplements
}
// Handle adds the specified function to the extpointHandlers.
func Handle(iface string, fn func(string, *Client)) {
handlers.Lock()
hdlrs, ok := handlers.extpointHandlers[iface]
if !ok {
hdlrs = []func(string, *Client){}
}
hdlrs = append(hdlrs, fn)
handlers.extpointHandlers[iface] = hdlrs
storage.Lock()
for _, p := range storage.plugins {
p.activateWait.L.Lock()
if p.activated() && p.implements(iface) {
p.handlersRun = false
}
p.activateWait.L.Unlock()
}
storage.Unlock()
handlers.Unlock()
}
// GetAll returns all the plugins for the specified implementation
func GetAll(imp string) ([]*Plugin, error) {
pluginNames, err := Scan()
if err != nil {
return nil, err
}
type plLoad struct {
pl *Plugin
err error
}
chPl := make(chan *plLoad, len(pluginNames))
var wg sync.WaitGroup
for _, name := range pluginNames {
storage.Lock()
pl, ok := storage.plugins[name]
storage.Unlock()
if ok {
chPl <- &plLoad{pl, nil}
continue
}
wg.Add(1)
go func(name string) {
defer wg.Done()
pl, err := loadWithRetry(name, false)
chPl <- &plLoad{pl, err}
}(name)
}
wg.Wait()
close(chPl)
var out []*Plugin
for pl := range chPl {
if pl.err != nil {
logrus.Error(pl.err)
continue
}
if err := pl.pl.waitActive(); err == nil && pl.pl.implements(imp) {
out = append(out, pl.pl)
}
}
return out, nil
}