mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
70b76266b5
TestPluginTrustedInstall revealed a race in the plugin shutdown logic, where the exit channel signal was sent even before the propagated mounts were unmounted. If the same plugin was enabled, it would try to setup propagated mounts *before* it was unmounted resulting in errors. This change fixes the behavior by waiting until the unmount completes on disable before marking the plugin as disabled. Signed-off-by: Anusha Ragunathan <anusha.ragunathan@docker.com>
372 lines
10 KiB
Go
372 lines
10 KiB
Go
package plugin
|
|
|
|
import (
|
|
"encoding/json"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"reflect"
|
|
"regexp"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/docker/distribution/reference"
|
|
"github.com/docker/docker/api/types"
|
|
"github.com/docker/docker/image"
|
|
"github.com/docker/docker/layer"
|
|
"github.com/docker/docker/libcontainerd"
|
|
"github.com/docker/docker/pkg/ioutils"
|
|
"github.com/docker/docker/pkg/mount"
|
|
"github.com/docker/docker/plugin/v2"
|
|
"github.com/docker/docker/registry"
|
|
"github.com/opencontainers/go-digest"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
const configFileName = "config.json"
|
|
const rootFSFileName = "rootfs"
|
|
|
|
var validFullID = regexp.MustCompile(`^([a-f0-9]{64})$`)
|
|
|
|
func (pm *Manager) restorePlugin(p *v2.Plugin) error {
|
|
if p.IsEnabled() {
|
|
return pm.restore(p)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type eventLogger func(id, name, action string)
|
|
|
|
// ManagerConfig defines configuration needed to start new manager.
|
|
type ManagerConfig struct {
|
|
Store *Store // remove
|
|
Executor libcontainerd.Remote
|
|
RegistryService registry.Service
|
|
LiveRestoreEnabled bool // TODO: remove
|
|
LogPluginEvent eventLogger
|
|
Root string
|
|
ExecRoot string
|
|
}
|
|
|
|
// Manager controls the plugin subsystem.
|
|
type Manager struct {
|
|
config ManagerConfig
|
|
mu sync.RWMutex // protects cMap
|
|
muGC sync.RWMutex // protects blobstore deletions
|
|
cMap map[*v2.Plugin]*controller
|
|
containerdClient libcontainerd.Client
|
|
blobStore *basicBlobStore
|
|
}
|
|
|
|
// controller represents the manager's control on a plugin.
|
|
type controller struct {
|
|
restart bool
|
|
exitChan chan bool
|
|
timeoutInSecs int
|
|
}
|
|
|
|
// pluginRegistryService ensures that all resolved repositories
|
|
// are of the plugin class.
|
|
type pluginRegistryService struct {
|
|
registry.Service
|
|
}
|
|
|
|
func (s pluginRegistryService) ResolveRepository(name reference.Named) (repoInfo *registry.RepositoryInfo, err error) {
|
|
repoInfo, err = s.Service.ResolveRepository(name)
|
|
if repoInfo != nil {
|
|
repoInfo.Class = "plugin"
|
|
}
|
|
return
|
|
}
|
|
|
|
// NewManager returns a new plugin manager.
|
|
func NewManager(config ManagerConfig) (*Manager, error) {
|
|
if config.RegistryService != nil {
|
|
config.RegistryService = pluginRegistryService{config.RegistryService}
|
|
}
|
|
manager := &Manager{
|
|
config: config,
|
|
}
|
|
if err := os.MkdirAll(manager.config.Root, 0700); err != nil {
|
|
return nil, errors.Wrapf(err, "failed to mkdir %v", manager.config.Root)
|
|
}
|
|
if err := os.MkdirAll(manager.config.ExecRoot, 0700); err != nil {
|
|
return nil, errors.Wrapf(err, "failed to mkdir %v", manager.config.ExecRoot)
|
|
}
|
|
if err := os.MkdirAll(manager.tmpDir(), 0700); err != nil {
|
|
return nil, errors.Wrapf(err, "failed to mkdir %v", manager.tmpDir())
|
|
}
|
|
var err error
|
|
manager.containerdClient, err = config.Executor.Client(manager) // todo: move to another struct
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to create containerd client")
|
|
}
|
|
manager.blobStore, err = newBasicBlobStore(filepath.Join(manager.config.Root, "storage/blobs"))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
manager.cMap = make(map[*v2.Plugin]*controller)
|
|
if err := manager.reload(); err != nil {
|
|
return nil, errors.Wrap(err, "failed to restore plugins")
|
|
}
|
|
return manager, nil
|
|
}
|
|
|
|
func (pm *Manager) tmpDir() string {
|
|
return filepath.Join(pm.config.Root, "tmp")
|
|
}
|
|
|
|
// StateChanged updates plugin internals using libcontainerd events.
|
|
func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error {
|
|
logrus.Debugf("plugin state changed %s %#v", id, e)
|
|
|
|
switch e.State {
|
|
case libcontainerd.StateExit:
|
|
p, err := pm.config.Store.GetV2Plugin(id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
os.RemoveAll(filepath.Join(pm.config.ExecRoot, id))
|
|
|
|
if p.PropagatedMount != "" {
|
|
if err := mount.Unmount(p.PropagatedMount); err != nil {
|
|
logrus.Warnf("Could not unmount %s: %v", p.PropagatedMount, err)
|
|
}
|
|
propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")
|
|
if err := mount.Unmount(propRoot); err != nil {
|
|
logrus.Warn("Could not unmount %s: %v", propRoot, err)
|
|
}
|
|
}
|
|
|
|
pm.mu.RLock()
|
|
c := pm.cMap[p]
|
|
if c.exitChan != nil {
|
|
close(c.exitChan)
|
|
}
|
|
restart := c.restart
|
|
pm.mu.RUnlock()
|
|
|
|
if restart {
|
|
pm.enable(p, c, true)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (pm *Manager) reload() error { // todo: restore
|
|
dir, err := ioutil.ReadDir(pm.config.Root)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to read %v", pm.config.Root)
|
|
}
|
|
plugins := make(map[string]*v2.Plugin)
|
|
for _, v := range dir {
|
|
if validFullID.MatchString(v.Name()) {
|
|
p, err := pm.loadPlugin(v.Name())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
plugins[p.GetID()] = p
|
|
}
|
|
}
|
|
|
|
pm.config.Store.SetAll(plugins)
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(plugins))
|
|
for _, p := range plugins {
|
|
c := &controller{} // todo: remove this
|
|
pm.cMap[p] = c
|
|
go func(p *v2.Plugin) {
|
|
defer wg.Done()
|
|
if err := pm.restorePlugin(p); err != nil {
|
|
logrus.Errorf("failed to restore plugin '%s': %s", p.Name(), err)
|
|
return
|
|
}
|
|
|
|
if p.Rootfs != "" {
|
|
p.Rootfs = filepath.Join(pm.config.Root, p.PluginObj.ID, "rootfs")
|
|
}
|
|
|
|
// We should only enable rootfs propagation for certain plugin types that need it.
|
|
for _, typ := range p.PluginObj.Config.Interface.Types {
|
|
if (typ.Capability == "volumedriver" || typ.Capability == "graphdriver") && typ.Prefix == "docker" && strings.HasPrefix(typ.Version, "1.") {
|
|
if p.PluginObj.Config.PropagatedMount != "" {
|
|
propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")
|
|
|
|
// check if we need to migrate an older propagated mount from before
|
|
// these mounts were stored outside the plugin rootfs
|
|
if _, err := os.Stat(propRoot); os.IsNotExist(err) {
|
|
if _, err := os.Stat(p.PropagatedMount); err == nil {
|
|
// make sure nothing is mounted here
|
|
// don't care about errors
|
|
mount.Unmount(p.PropagatedMount)
|
|
if err := os.Rename(p.PropagatedMount, propRoot); err != nil {
|
|
logrus.WithError(err).WithField("dir", propRoot).Error("error migrating propagated mount storage")
|
|
}
|
|
if err := os.MkdirAll(p.PropagatedMount, 0755); err != nil {
|
|
logrus.WithError(err).WithField("dir", p.PropagatedMount).Error("error migrating propagated mount storage")
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := os.MkdirAll(propRoot, 0755); err != nil {
|
|
logrus.Errorf("failed to create PropagatedMount directory at %s: %v", propRoot, err)
|
|
}
|
|
// TODO: sanitize PropagatedMount and prevent breakout
|
|
p.PropagatedMount = filepath.Join(p.Rootfs, p.PluginObj.Config.PropagatedMount)
|
|
if err := os.MkdirAll(p.PropagatedMount, 0755); err != nil {
|
|
logrus.Errorf("failed to create PropagatedMount directory at %s: %v", p.PropagatedMount, err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
pm.save(p)
|
|
requiresManualRestore := !pm.config.LiveRestoreEnabled && p.IsEnabled()
|
|
|
|
if requiresManualRestore {
|
|
// if liveRestore is not enabled, the plugin will be stopped now so we should enable it
|
|
if err := pm.enable(p, c, true); err != nil {
|
|
logrus.Errorf("failed to enable plugin '%s': %s", p.Name(), err)
|
|
}
|
|
}
|
|
}(p)
|
|
}
|
|
wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
func (pm *Manager) loadPlugin(id string) (*v2.Plugin, error) {
|
|
p := filepath.Join(pm.config.Root, id, configFileName)
|
|
dt, err := ioutil.ReadFile(p)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "error reading %v", p)
|
|
}
|
|
var plugin v2.Plugin
|
|
if err := json.Unmarshal(dt, &plugin); err != nil {
|
|
return nil, errors.Wrapf(err, "error decoding %v", p)
|
|
}
|
|
return &plugin, nil
|
|
}
|
|
|
|
func (pm *Manager) save(p *v2.Plugin) error {
|
|
pluginJSON, err := json.Marshal(p)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to marshal plugin json")
|
|
}
|
|
if err := ioutils.AtomicWriteFile(filepath.Join(pm.config.Root, p.GetID(), configFileName), pluginJSON, 0600); err != nil {
|
|
return errors.Wrap(err, "failed to write atomically plugin json")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GC cleans up unrefrenced blobs. This is recommended to run in a goroutine
|
|
func (pm *Manager) GC() {
|
|
pm.muGC.Lock()
|
|
defer pm.muGC.Unlock()
|
|
|
|
whitelist := make(map[digest.Digest]struct{})
|
|
for _, p := range pm.config.Store.GetAll() {
|
|
whitelist[p.Config] = struct{}{}
|
|
for _, b := range p.Blobsums {
|
|
whitelist[b] = struct{}{}
|
|
}
|
|
}
|
|
|
|
pm.blobStore.gc(whitelist)
|
|
}
|
|
|
|
type logHook struct{ id string }
|
|
|
|
func (logHook) Levels() []logrus.Level {
|
|
return logrus.AllLevels
|
|
}
|
|
|
|
func (l logHook) Fire(entry *logrus.Entry) error {
|
|
entry.Data = logrus.Fields{"plugin": l.id}
|
|
return nil
|
|
}
|
|
|
|
func attachToLog(id string) func(libcontainerd.IOPipe) error {
|
|
return func(iop libcontainerd.IOPipe) error {
|
|
iop.Stdin.Close()
|
|
|
|
logger := logrus.New()
|
|
logger.Hooks.Add(logHook{id})
|
|
// TODO: cache writer per id
|
|
w := logger.Writer()
|
|
go func() {
|
|
io.Copy(w, iop.Stdout)
|
|
}()
|
|
go func() {
|
|
// TODO: update logrus and use logger.WriterLevel
|
|
io.Copy(w, iop.Stderr)
|
|
}()
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func validatePrivileges(requiredPrivileges, privileges types.PluginPrivileges) error {
|
|
if !isEqual(requiredPrivileges, privileges, isEqualPrivilege) {
|
|
return errors.New("incorrect privileges")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func isEqual(arrOne, arrOther types.PluginPrivileges, compare func(x, y types.PluginPrivilege) bool) bool {
|
|
if len(arrOne) != len(arrOther) {
|
|
return false
|
|
}
|
|
|
|
sort.Sort(arrOne)
|
|
sort.Sort(arrOther)
|
|
|
|
for i := 1; i < arrOne.Len(); i++ {
|
|
if !compare(arrOne[i], arrOther[i]) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func isEqualPrivilege(a, b types.PluginPrivilege) bool {
|
|
if a.Name != b.Name {
|
|
return false
|
|
}
|
|
|
|
return reflect.DeepEqual(a.Value, b.Value)
|
|
}
|
|
|
|
func configToRootFS(c []byte) (*image.RootFS, error) {
|
|
var pluginConfig types.PluginConfig
|
|
if err := json.Unmarshal(c, &pluginConfig); err != nil {
|
|
return nil, err
|
|
}
|
|
// validation for empty rootfs is in distribution code
|
|
if pluginConfig.Rootfs == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
return rootFSFromPlugin(pluginConfig.Rootfs), nil
|
|
}
|
|
|
|
func rootFSFromPlugin(pluginfs *types.PluginConfigRootfs) *image.RootFS {
|
|
rootFS := image.RootFS{
|
|
Type: pluginfs.Type,
|
|
DiffIDs: make([]layer.DiffID, len(pluginfs.DiffIds)),
|
|
}
|
|
for i := range pluginfs.DiffIds {
|
|
rootFS.DiffIDs[i] = layer.DiffID(pluginfs.DiffIds[i])
|
|
}
|
|
|
|
return &rootFS
|
|
}
|