diff --git a/plugin/backend_linux.go b/plugin/backend_linux.go index 4b4fd11a65..f9396626c3 100644 --- a/plugin/backend_linux.go +++ b/plugin/backend_linux.go @@ -37,7 +37,11 @@ func (pm *Manager) Disable(name string) error { if err != nil { return err } - if err := pm.disable(p); err != nil { + pm.mu.RLock() + c := pm.cMap[p] + pm.mu.RUnlock() + + if err := pm.disable(p, c); err != nil { return err } pm.pluginEventLogger(p.GetID(), name, "disable") @@ -46,14 +50,13 @@ func (pm *Manager) Disable(name string) error { // Enable activates a plugin, which implies that they are ready to be used by containers. func (pm *Manager) Enable(name string, config *types.PluginEnableConfig) error { - p, err := pm.pluginStore.GetByName(name) if err != nil { return err } - p.TimeoutInSecs = config.Timeout - if err := pm.enable(p, false); err != nil { + c := &controller{timeoutInSecs: config.Timeout} + if err := pm.enable(p, c, false); err != nil { return err } pm.pluginEventLogger(p.GetID(), name, "enable") @@ -267,25 +270,25 @@ func (pm *Manager) Push(name string, metaHeader http.Header, authConfig *types.A // Remove deletes plugin's root directory. func (pm *Manager) Remove(name string, config *types.PluginRmConfig) error { p, err := pm.pluginStore.GetByName(name) + pm.mu.RLock() + c := pm.cMap[p] + pm.mu.RUnlock() + if err != nil { return err } if !config.ForceRemove { - p.RLock() - if p.RefCount > 0 { - p.RUnlock() + if p.GetRefCount() > 0 { return fmt.Errorf("plugin %s is in use", p.Name()) } - p.RUnlock() - if p.IsEnabled() { return fmt.Errorf("plugin %s is enabled", p.Name()) } } if p.IsEnabled() { - if err := pm.disable(p); err != nil { + if err := pm.disable(p, c); err != nil { logrus.Errorf("failed to disable plugin '%s': %s", p.Name(), err) } } diff --git a/plugin/manager.go b/plugin/manager.go index a878983308..0f6c43cdc8 100644 --- a/plugin/manager.go +++ b/plugin/manager.go @@ -19,7 +19,7 @@ var ( ) func (pm *Manager) restorePlugin(p *v2.Plugin) error { - p.RuntimeSourcePath = filepath.Join(pm.runRoot, p.GetID()) + p.Restore(pm.runRoot) if p.IsEnabled() { return pm.restore(p) } @@ -37,6 +37,15 @@ type Manager struct { registryService registry.Service liveRestore bool pluginEventLogger eventLogger + mu sync.RWMutex // protects cMap + cMap map[*v2.Plugin]*controller +} + +// controller represents the manager's control on a plugin. +type controller struct { + restart bool + exitChan chan bool + timeoutInSecs int } // GetManager returns the singleton plugin Manager @@ -67,7 +76,8 @@ func Init(root string, ps *store.Store, remote libcontainerd.Remote, rs registry if err != nil { return err } - if err := manager.init(); err != nil { + manager.cMap = make(map[*v2.Plugin]*controller) + if err := manager.reload(); err != nil { return err } return nil @@ -83,22 +93,27 @@ func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error { if err != nil { return err } - p.RLock() - if p.ExitChan != nil { - close(p.ExitChan) + + pm.mu.RLock() + c := pm.cMap[p] + + if c.exitChan != nil { + close(c.exitChan) } - restart := p.Restart - p.RUnlock() + restart := c.restart + pm.mu.RUnlock() + p.RemoveFromDisk() if restart { - pm.enable(p, true) + pm.enable(p, c, true) } } return nil } -func (pm *Manager) init() error { +// reload is used on daemon restarts to load the manager's state +func (pm *Manager) reload() error { dt, err := os.Open(filepath.Join(pm.libRoot, "plugins.json")) if err != nil { if os.IsNotExist(err) { @@ -117,6 +132,8 @@ func (pm *Manager) init() error { var group sync.WaitGroup group.Add(len(plugins)) for _, p := range plugins { + c := &controller{} + pm.cMap[p] = c go func(p *v2.Plugin) { defer group.Done() if err := pm.restorePlugin(p); err != nil { @@ -129,7 +146,7 @@ func (pm *Manager) init() error { if requiresManualRestore { // if liveRestore is not enabled, the plugin will be stopped now so we should enable it - if err := pm.enable(p, true); err != nil { + if err := pm.enable(p, c, true); err != nil { logrus.Errorf("failed to enable plugin '%s': %s", p.Name(), err) } } diff --git a/plugin/manager_linux.go b/plugin/manager_linux.go index bf477f4163..b1bf221fda 100644 --- a/plugin/manager_linux.go +++ b/plugin/manager_linux.go @@ -16,7 +16,7 @@ import ( specs "github.com/opencontainers/runtime-spec/specs-go" ) -func (pm *Manager) enable(p *v2.Plugin, force bool) error { +func (pm *Manager) enable(p *v2.Plugin, c *controller, force bool) error { if p.IsEnabled() && !force { return fmt.Errorf("plugin %s is already enabled", p.Name()) } @@ -24,23 +24,26 @@ func (pm *Manager) enable(p *v2.Plugin, force bool) error { if err != nil { return err } - p.Lock() - p.Restart = true - p.ExitChan = make(chan bool) - p.Unlock() + + c.restart = true + c.exitChan = make(chan bool) + + pm.mu.Lock() + pm.cMap[p] = c + pm.mu.Unlock() + if err := pm.containerdClient.Create(p.GetID(), "", "", specs.Spec(*spec), attachToLog(p.GetID())); err != nil { return err } - p.PClient, err = plugins.NewClientWithTimeout("unix://"+filepath.Join(p.RuntimeSourcePath, p.GetSocket()), nil, p.TimeoutInSecs) + client, err := plugins.NewClientWithTimeout("unix://"+filepath.Join(p.GetRuntimeSourcePath(), p.GetSocket()), nil, c.timeoutInSecs) if err != nil { - p.Lock() - p.Restart = false - p.Unlock() - shutdownPlugin(p, pm.containerdClient) + c.restart = false + shutdownPlugin(p, c, pm.containerdClient) return err } + p.SetPClient(client) pm.pluginStore.SetState(p, true) pm.pluginStore.CallHandler(p) @@ -51,7 +54,7 @@ func (pm *Manager) restore(p *v2.Plugin) error { return pm.containerdClient.Restore(p.GetID(), attachToLog(p.GetID())) } -func shutdownPlugin(p *v2.Plugin, containerdClient libcontainerd.Client) { +func shutdownPlugin(p *v2.Plugin, c *controller, containerdClient libcontainerd.Client) { pluginID := p.GetID() err := containerdClient.Signal(pluginID, int(syscall.SIGTERM)) @@ -59,7 +62,7 @@ func shutdownPlugin(p *v2.Plugin, containerdClient libcontainerd.Client) { logrus.Errorf("Sending SIGTERM to plugin failed with error: %v", err) } else { select { - case <-p.ExitChan: + case <-c.exitChan: logrus.Debug("Clean shutdown of plugin") case <-time.After(time.Second * 10): logrus.Debug("Force shutdown plugin") @@ -70,15 +73,13 @@ func shutdownPlugin(p *v2.Plugin, containerdClient libcontainerd.Client) { } } -func (pm *Manager) disable(p *v2.Plugin) error { +func (pm *Manager) disable(p *v2.Plugin, c *controller) error { if !p.IsEnabled() { return fmt.Errorf("plugin %s is already disabled", p.Name()) } - p.Lock() - p.Restart = false - p.Unlock() - shutdownPlugin(p, pm.containerdClient) + c.restart = false + shutdownPlugin(p, c, pm.containerdClient) pm.pluginStore.SetState(p, false) return nil } @@ -87,15 +88,17 @@ func (pm *Manager) disable(p *v2.Plugin) error { func (pm *Manager) Shutdown() { plugins := pm.pluginStore.GetAll() for _, p := range plugins { + pm.mu.RLock() + c := pm.cMap[p] + pm.mu.RUnlock() + if pm.liveRestore && p.IsEnabled() { logrus.Debug("Plugin active when liveRestore is set, skipping shutdown") continue } if pm.containerdClient != nil && p.IsEnabled() { - p.Lock() - p.Restart = false - p.Unlock() - shutdownPlugin(p, pm.containerdClient) + c.restart = false + shutdownPlugin(p, c, pm.containerdClient) } } } diff --git a/plugin/manager_solaris.go b/plugin/manager_solaris.go index 7656a59ad7..72ccae72d3 100644 --- a/plugin/manager_solaris.go +++ b/plugin/manager_solaris.go @@ -7,7 +7,7 @@ import ( specs "github.com/opencontainers/runtime-spec/specs-go" ) -func (pm *Manager) enable(p *v2.Plugin, force bool) error { +func (pm *Manager) enable(p *v2.Plugin, c *controller, force bool) error { return fmt.Errorf("Not implemented") } @@ -15,7 +15,7 @@ func (pm *Manager) initSpec(p *v2.Plugin) (*specs.Spec, error) { return nil, fmt.Errorf("Not implemented") } -func (pm *Manager) disable(p *v2.Plugin) error { +func (pm *Manager) disable(p *v2.Plugin, c *controller) error { return fmt.Errorf("Not implemented") } diff --git a/plugin/manager_windows.go b/plugin/manager_windows.go index 6828018750..4469a671f7 100644 --- a/plugin/manager_windows.go +++ b/plugin/manager_windows.go @@ -9,7 +9,7 @@ import ( specs "github.com/opencontainers/runtime-spec/specs-go" ) -func (pm *Manager) enable(p *v2.Plugin, force bool) error { +func (pm *Manager) enable(p *v2.Plugin, c *controller, force bool) error { return fmt.Errorf("Not implemented") } @@ -17,7 +17,7 @@ func (pm *Manager) initSpec(p *v2.Plugin) (*specs.Spec, error) { return nil, fmt.Errorf("Not implemented") } -func (pm *Manager) disable(p *v2.Plugin) error { +func (pm *Manager) disable(p *v2.Plugin, c *controller) error { return fmt.Errorf("Not implemented") } diff --git a/plugin/store/store.go b/plugin/store/store.go index 0517f8f2c7..4f4665eb42 100644 --- a/plugin/store/store.go +++ b/plugin/store/store.go @@ -174,9 +174,7 @@ func (ps *Store) Get(name, capability string, mode int) (plugingetter.CompatPlug } p, err = ps.GetByName(fullName) if err == nil { - p.Lock() - p.RefCount += mode - p.Unlock() + p.SetRefCount(mode + p.GetRefCount()) if p.IsEnabled() { return p.FilterByCap(capability) } diff --git a/plugin/v2/plugin.go b/plugin/v2/plugin.go index 7ea115cb39..4046bf7dbe 100644 --- a/plugin/v2/plugin.go +++ b/plugin/v2/plugin.go @@ -17,15 +17,12 @@ import ( // Plugin represents an individual plugin. type Plugin struct { - sync.RWMutex - PluginObj types.Plugin `json:"plugin"` - PClient *plugins.Client `json:"-"` - RuntimeSourcePath string `json:"-"` - RefCount int `json:"-"` - Restart bool `json:"-"` - ExitChan chan bool `json:"-"` - LibRoot string `json:"-"` - TimeoutInSecs int `json:"-"` + mu sync.RWMutex + PluginObj types.Plugin `json:"plugin"` + pClient *plugins.Client + runtimeSourcePath string + refCount int + libRoot string } const defaultPluginRuntimeDestination = "/run/docker/plugins" @@ -47,14 +44,39 @@ func newPluginObj(name, id, tag string) types.Plugin { func NewPlugin(name, id, runRoot, libRoot, tag string) *Plugin { return &Plugin{ PluginObj: newPluginObj(name, id, tag), - RuntimeSourcePath: filepath.Join(runRoot, id), - LibRoot: libRoot, + runtimeSourcePath: filepath.Join(runRoot, id), + libRoot: libRoot, } } +// Restore restores the plugin +func (p *Plugin) Restore(runRoot string) { + p.runtimeSourcePath = filepath.Join(runRoot, p.GetID()) +} + +// GetRuntimeSourcePath gets the Source (host) path of the plugin socket +// This path gets bind mounted into the plugin. +func (p *Plugin) GetRuntimeSourcePath() string { + p.mu.RLock() + defer p.mu.RUnlock() + + return p.runtimeSourcePath +} + // Client returns the plugin client. func (p *Plugin) Client() *plugins.Client { - return p.PClient + p.mu.RLock() + defer p.mu.RUnlock() + + return p.pClient +} + +// SetPClient set the plugin client. +func (p *Plugin) SetPClient(client *plugins.Client) { + p.mu.Lock() + defer p.mu.Unlock() + + p.pClient = client } // IsV1 returns true for V1 plugins and false otherwise. @@ -85,12 +107,12 @@ func (p *Plugin) FilterByCap(capability string) (*Plugin, error) { // RemoveFromDisk deletes the plugin's runtime files from disk. func (p *Plugin) RemoveFromDisk() error { - return os.RemoveAll(p.RuntimeSourcePath) + return os.RemoveAll(p.runtimeSourcePath) } // InitPlugin populates the plugin object from the plugin config file. func (p *Plugin) InitPlugin() error { - dt, err := os.Open(filepath.Join(p.LibRoot, p.PluginObj.ID, "config.json")) + dt, err := os.Open(filepath.Join(p.libRoot, p.PluginObj.ID, "config.json")) if err != nil { return err } @@ -118,7 +140,7 @@ func (p *Plugin) InitPlugin() error { } func (p *Plugin) writeSettings() error { - f, err := os.Create(filepath.Join(p.LibRoot, p.PluginObj.ID, "plugin-settings.json")) + f, err := os.Create(filepath.Join(p.libRoot, p.PluginObj.ID, "plugin-settings.json")) if err != nil { return err } @@ -129,8 +151,8 @@ func (p *Plugin) writeSettings() error { // Set is used to pass arguments to the plugin. func (p *Plugin) Set(args []string) error { - p.Lock() - defer p.Unlock() + p.mu.Lock() + defer p.mu.Unlock() if p.PluginObj.Enabled { return fmt.Errorf("cannot set on an active plugin, disable plugin before setting") @@ -218,36 +240,52 @@ next: // IsEnabled returns the active state of the plugin. func (p *Plugin) IsEnabled() bool { - p.RLock() - defer p.RUnlock() + p.mu.RLock() + defer p.mu.RUnlock() return p.PluginObj.Enabled } // GetID returns the plugin's ID. func (p *Plugin) GetID() string { - p.RLock() - defer p.RUnlock() + p.mu.RLock() + defer p.mu.RUnlock() return p.PluginObj.ID } // GetSocket returns the plugin socket. func (p *Plugin) GetSocket() string { - p.RLock() - defer p.RUnlock() + p.mu.RLock() + defer p.mu.RUnlock() return p.PluginObj.Config.Interface.Socket } // GetTypes returns the interface types of a plugin. func (p *Plugin) GetTypes() []types.PluginInterfaceType { - p.RLock() - defer p.RUnlock() + p.mu.RLock() + defer p.mu.RUnlock() return p.PluginObj.Config.Interface.Types } +// GetRefCount returns the reference count. +func (p *Plugin) GetRefCount() int { + p.mu.RLock() + defer p.mu.RUnlock() + + return p.refCount +} + +// SetRefCount sets the reference count. +func (p *Plugin) SetRefCount(count int) { + p.mu.Lock() + defer p.mu.Unlock() + + p.refCount = count +} + // InitSpec creates an OCI spec from the plugin's config. func (p *Plugin) InitSpec(s specs.Spec, libRoot string) (*specs.Spec, error) { rootfs := filepath.Join(libRoot, p.PluginObj.ID, "rootfs") @@ -262,7 +300,7 @@ func (p *Plugin) InitSpec(s specs.Spec, libRoot string) (*specs.Spec, error) { } mounts := append(p.PluginObj.Config.Mounts, types.PluginMount{ - Source: &p.RuntimeSourcePath, + Source: &p.runtimeSourcePath, Destination: defaultPluginRuntimeDestination, Type: "bind", Options: []string{"rbind", "rshared"},