mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
6a0bdffc1a
When a conflict is found in the volume cache, check with the driver if that volume still actually exists. If the volume doesn't exist, purge it from the cache and allow the create to happen. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
617 lines
17 KiB
Go
617 lines
17 KiB
Go
package store
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/boltdb/bolt"
|
|
"github.com/docker/docker/pkg/locker"
|
|
"github.com/docker/docker/volume"
|
|
"github.com/docker/docker/volume/drivers"
|
|
)
|
|
|
|
const (
|
|
volumeDataDir = "volumes"
|
|
volumeBucketName = "volumes"
|
|
)
|
|
|
|
type volumeMetadata struct {
|
|
Name string
|
|
Labels map[string]string
|
|
Options map[string]string
|
|
}
|
|
|
|
type volumeWrapper struct {
|
|
volume.Volume
|
|
labels map[string]string
|
|
scope string
|
|
options map[string]string
|
|
}
|
|
|
|
func (v volumeWrapper) Options() map[string]string {
|
|
options := map[string]string{}
|
|
for key, value := range v.options {
|
|
options[key] = value
|
|
}
|
|
return options
|
|
}
|
|
|
|
func (v volumeWrapper) Labels() map[string]string {
|
|
return v.labels
|
|
}
|
|
|
|
func (v volumeWrapper) Scope() string {
|
|
return v.scope
|
|
}
|
|
|
|
func (v volumeWrapper) CachedPath() string {
|
|
if vv, ok := v.Volume.(interface {
|
|
CachedPath() string
|
|
}); ok {
|
|
return vv.CachedPath()
|
|
}
|
|
return v.Volume.Path()
|
|
}
|
|
|
|
// New initializes a VolumeStore to keep
|
|
// reference counting of volumes in the system.
|
|
func New(rootPath string) (*VolumeStore, error) {
|
|
vs := &VolumeStore{
|
|
locks: &locker.Locker{},
|
|
names: make(map[string]volume.Volume),
|
|
refs: make(map[string][]string),
|
|
labels: make(map[string]map[string]string),
|
|
options: make(map[string]map[string]string),
|
|
}
|
|
|
|
if rootPath != "" {
|
|
// initialize metadata store
|
|
volPath := filepath.Join(rootPath, volumeDataDir)
|
|
if err := os.MkdirAll(volPath, 750); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dbPath := filepath.Join(volPath, "metadata.db")
|
|
|
|
var err error
|
|
vs.db, err = bolt.Open(dbPath, 0600, &bolt.Options{Timeout: 1 * time.Second})
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "error while opening volume store metadata database")
|
|
}
|
|
|
|
// initialize volumes bucket
|
|
if err := vs.db.Update(func(tx *bolt.Tx) error {
|
|
if _, err := tx.CreateBucketIfNotExists([]byte(volumeBucketName)); err != nil {
|
|
return errors.Wrap(err, "error while setting up volume store metadata database")
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return vs, nil
|
|
}
|
|
|
|
func (s *VolumeStore) getNamed(name string) (volume.Volume, bool) {
|
|
s.globalLock.Lock()
|
|
v, exists := s.names[name]
|
|
s.globalLock.Unlock()
|
|
return v, exists
|
|
}
|
|
|
|
func (s *VolumeStore) setNamed(v volume.Volume, ref string) {
|
|
s.globalLock.Lock()
|
|
s.names[v.Name()] = v
|
|
if len(ref) > 0 {
|
|
s.refs[v.Name()] = append(s.refs[v.Name()], ref)
|
|
}
|
|
s.globalLock.Unlock()
|
|
}
|
|
|
|
// getRefs gets the list of refs for a given name
|
|
// Callers of this function are expected to hold the name lock.
|
|
func (s *VolumeStore) getRefs(name string) []string {
|
|
s.globalLock.Lock()
|
|
refs := s.refs[name]
|
|
s.globalLock.Unlock()
|
|
return refs
|
|
}
|
|
|
|
// Purge allows the cleanup of internal data on docker in case
|
|
// the internal data is out of sync with volumes driver plugins.
|
|
func (s *VolumeStore) Purge(name string) {
|
|
s.globalLock.Lock()
|
|
delete(s.names, name)
|
|
delete(s.refs, name)
|
|
delete(s.labels, name)
|
|
delete(s.options, name)
|
|
s.globalLock.Unlock()
|
|
}
|
|
|
|
// VolumeStore is a struct that stores the list of volumes available and keeps track of their usage counts
|
|
type VolumeStore struct {
|
|
locks *locker.Locker
|
|
globalLock sync.Mutex
|
|
// names stores the volume name -> driver name relationship.
|
|
// This is used for making lookups faster so we don't have to probe all drivers
|
|
names map[string]volume.Volume
|
|
// refs stores the volume name and the list of things referencing it
|
|
refs map[string][]string
|
|
// labels stores volume labels for each volume
|
|
labels map[string]map[string]string
|
|
// options stores volume options for each volume
|
|
options map[string]map[string]string
|
|
db *bolt.DB
|
|
}
|
|
|
|
// List proxies to all registered volume drivers to get the full list of volumes
|
|
// If a driver returns a volume that has name which conflicts with another volume from a different driver,
|
|
// the first volume is chosen and the conflicting volume is dropped.
|
|
func (s *VolumeStore) List() ([]volume.Volume, []string, error) {
|
|
vols, warnings, err := s.list()
|
|
if err != nil {
|
|
return nil, nil, &OpErr{Err: err, Op: "list"}
|
|
}
|
|
var out []volume.Volume
|
|
|
|
for _, v := range vols {
|
|
name := normaliseVolumeName(v.Name())
|
|
|
|
s.locks.Lock(name)
|
|
storedV, exists := s.getNamed(name)
|
|
// Note: it's not safe to populate the cache here because the volume may have been
|
|
// deleted before we acquire a lock on its name
|
|
if exists && storedV.DriverName() != v.DriverName() {
|
|
logrus.Warnf("Volume name %s already exists for driver %s, not including volume returned by %s", v.Name(), storedV.DriverName(), v.DriverName())
|
|
s.locks.Unlock(v.Name())
|
|
continue
|
|
}
|
|
|
|
out = append(out, v)
|
|
s.locks.Unlock(v.Name())
|
|
}
|
|
return out, warnings, nil
|
|
}
|
|
|
|
// list goes through each volume driver and asks for its list of volumes.
|
|
func (s *VolumeStore) list() ([]volume.Volume, []string, error) {
|
|
var (
|
|
ls []volume.Volume
|
|
warnings []string
|
|
)
|
|
|
|
drivers, err := volumedrivers.GetAllDrivers()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
type vols struct {
|
|
vols []volume.Volume
|
|
err error
|
|
driverName string
|
|
}
|
|
chVols := make(chan vols, len(drivers))
|
|
|
|
for _, vd := range drivers {
|
|
go func(d volume.Driver) {
|
|
vs, err := d.List()
|
|
if err != nil {
|
|
chVols <- vols{driverName: d.Name(), err: &OpErr{Err: err, Name: d.Name(), Op: "list"}}
|
|
return
|
|
}
|
|
for i, v := range vs {
|
|
vs[i] = volumeWrapper{v, s.labels[v.Name()], d.Scope(), s.options[v.Name()]}
|
|
}
|
|
|
|
chVols <- vols{vols: vs}
|
|
}(vd)
|
|
}
|
|
|
|
badDrivers := make(map[string]struct{})
|
|
for i := 0; i < len(drivers); i++ {
|
|
vs := <-chVols
|
|
|
|
if vs.err != nil {
|
|
warnings = append(warnings, vs.err.Error())
|
|
badDrivers[vs.driverName] = struct{}{}
|
|
logrus.Warn(vs.err)
|
|
}
|
|
ls = append(ls, vs.vols...)
|
|
}
|
|
|
|
if len(badDrivers) > 0 {
|
|
for _, v := range s.names {
|
|
if _, exists := badDrivers[v.DriverName()]; exists {
|
|
ls = append(ls, v)
|
|
}
|
|
}
|
|
}
|
|
return ls, warnings, nil
|
|
}
|
|
|
|
// CreateWithRef creates a volume with the given name and driver and stores the ref
|
|
// This ensures there's no race between creating a volume and then storing a reference.
|
|
func (s *VolumeStore) CreateWithRef(name, driverName, ref string, opts, labels map[string]string) (volume.Volume, error) {
|
|
name = normaliseVolumeName(name)
|
|
s.locks.Lock(name)
|
|
defer s.locks.Unlock(name)
|
|
|
|
v, err := s.create(name, driverName, opts, labels)
|
|
if err != nil {
|
|
return nil, &OpErr{Err: err, Name: name, Op: "create"}
|
|
}
|
|
|
|
s.setNamed(v, ref)
|
|
return v, nil
|
|
}
|
|
|
|
// Create creates a volume with the given name and driver.
|
|
// This is just like CreateWithRef() except we don't store a reference while holding the lock.
|
|
func (s *VolumeStore) Create(name, driverName string, opts, labels map[string]string) (volume.Volume, error) {
|
|
return s.CreateWithRef(name, driverName, "", opts, labels)
|
|
}
|
|
|
|
// checkConflict checks the local cache for name collisions with the passed in name,
|
|
// for existing volumes with the same name but in a different driver.
|
|
// This is used by `Create` as a best effort to prevent name collisions for volumes.
|
|
// If a matching volume is found that is not a conflict that is returned so the caller
|
|
// does not need to perform an additional lookup.
|
|
// When no matching volume is found, both returns will be nil
|
|
//
|
|
// Note: This does not probe all the drivers for name collisions because v1 plugins
|
|
// are very slow, particularly if the plugin is down, and cause other issues,
|
|
// particularly around locking the store.
|
|
// TODO(cpuguy83): With v2 plugins this shouldn't be a problem. Could also potentially
|
|
// use a connect timeout for this kind of check to ensure we aren't blocking for a
|
|
// long time.
|
|
func (s *VolumeStore) checkConflict(name, driverName string) (volume.Volume, error) {
|
|
// check the local cache
|
|
v, _ := s.getNamed(name)
|
|
if v != nil {
|
|
vDriverName := v.DriverName()
|
|
if driverName != "" && vDriverName != driverName {
|
|
// we have what looks like a conflict
|
|
// let's see if there are existing refs to this volume, if so we don't need
|
|
// to go any further since we can assume the volume is legit.
|
|
if len(s.getRefs(name)) > 0 {
|
|
return nil, errors.Wrapf(errNameConflict, "driver '%s' already has volume '%s'", vDriverName, name)
|
|
}
|
|
|
|
// looks like there is a conflict, but nothing is referencing it...
|
|
// let's check if the found volume ref
|
|
// is stale by checking with the driver if it still exists
|
|
vd, err := volumedrivers.GetDriver(vDriverName)
|
|
if err != nil {
|
|
// play it safe and return the error
|
|
// TODO(cpuguy83): maybe when when v2 plugins are ubiquitous, we should
|
|
// just purge this from the cache
|
|
return nil, errors.Wrapf(errNameConflict, "found reference to volume '%s' in driver '%s', but got an error while checking the driver: %v", name, vDriverName, err)
|
|
}
|
|
|
|
// now check if it still exists in the driver
|
|
v2, err := vd.Get(name)
|
|
err = errors.Cause(err)
|
|
if err != nil {
|
|
if _, ok := err.(net.Error); ok {
|
|
// got some error related to the driver connectivity
|
|
// play it safe and return the error
|
|
// TODO(cpuguy83): When when v2 plugins are ubiquitous, maybe we should
|
|
// just purge this from the cache
|
|
return nil, errors.Wrapf(errNameConflict, "found reference to volume '%s' in driver '%s', but got an error while checking the driver: %v", name, vDriverName, err)
|
|
}
|
|
|
|
// a driver can return whatever it wants, so let's make sure this is nil
|
|
if v2 == nil {
|
|
// purge this reference from the cache
|
|
s.Purge(name)
|
|
return nil, nil
|
|
}
|
|
}
|
|
if v2 != nil {
|
|
return nil, errors.Wrapf(errNameConflict, "driver '%s' already has volume '%s'", vDriverName, name)
|
|
}
|
|
}
|
|
return v, nil
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
// create asks the given driver to create a volume with the name/opts.
|
|
// If a volume with the name is already known, it will ask the stored driver for the volume.
|
|
// If the passed in driver name does not match the driver name which is stored
|
|
// for the given volume name, an error is returned after checking if the reference is stale.
|
|
// If the reference is stale, it will be purged and this create can continue.
|
|
// It is expected that callers of this function hold any necessary locks.
|
|
func (s *VolumeStore) create(name, driverName string, opts, labels map[string]string) (volume.Volume, error) {
|
|
// Validate the name in a platform-specific manner
|
|
valid, err := volume.IsVolumeNameValid(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !valid {
|
|
return nil, &OpErr{Err: errInvalidName, Name: name, Op: "create"}
|
|
}
|
|
|
|
v, err := s.checkConflict(name, driverName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if v != nil {
|
|
return v, nil
|
|
}
|
|
|
|
// Since there isn't a specified driver name, let's see if any of the existing drivers have this volume name
|
|
if driverName == "" {
|
|
v, _ := s.getVolume(name)
|
|
if v != nil {
|
|
return v, nil
|
|
}
|
|
}
|
|
|
|
vd, err := volumedrivers.CreateDriver(driverName)
|
|
|
|
if err != nil {
|
|
return nil, &OpErr{Op: "create", Name: name, Err: err}
|
|
}
|
|
|
|
logrus.Debugf("Registering new volume reference: driver %q, name %q", vd.Name(), name)
|
|
|
|
if v, _ := vd.Get(name); v != nil {
|
|
return v, nil
|
|
}
|
|
v, err = vd.Create(name, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.globalLock.Lock()
|
|
s.labels[name] = labels
|
|
s.options[name] = opts
|
|
s.globalLock.Unlock()
|
|
|
|
if s.db != nil {
|
|
metadata := &volumeMetadata{
|
|
Name: name,
|
|
Labels: labels,
|
|
Options: opts,
|
|
}
|
|
|
|
volData, err := json.Marshal(metadata)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.db.Update(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket([]byte(volumeBucketName))
|
|
err := b.Put([]byte(name), volData)
|
|
return err
|
|
}); err != nil {
|
|
return nil, errors.Wrap(err, "error while persisting volume metadata")
|
|
}
|
|
}
|
|
|
|
return volumeWrapper{v, labels, vd.Scope(), opts}, nil
|
|
}
|
|
|
|
// GetWithRef gets a volume with the given name from the passed in driver and stores the ref
|
|
// This is just like Get(), but we store the reference while holding the lock.
|
|
// This makes sure there are no races between checking for the existence of a volume and adding a reference for it
|
|
func (s *VolumeStore) GetWithRef(name, driverName, ref string) (volume.Volume, error) {
|
|
name = normaliseVolumeName(name)
|
|
s.locks.Lock(name)
|
|
defer s.locks.Unlock(name)
|
|
|
|
vd, err := volumedrivers.GetDriver(driverName)
|
|
if err != nil {
|
|
return nil, &OpErr{Err: err, Name: name, Op: "get"}
|
|
}
|
|
|
|
v, err := vd.Get(name)
|
|
if err != nil {
|
|
return nil, &OpErr{Err: err, Name: name, Op: "get"}
|
|
}
|
|
|
|
s.setNamed(v, ref)
|
|
|
|
return volumeWrapper{v, s.labels[name], vd.Scope(), s.options[name]}, nil
|
|
}
|
|
|
|
// Get looks if a volume with the given name exists and returns it if so
|
|
func (s *VolumeStore) Get(name string) (volume.Volume, error) {
|
|
name = normaliseVolumeName(name)
|
|
s.locks.Lock(name)
|
|
defer s.locks.Unlock(name)
|
|
|
|
v, err := s.getVolume(name)
|
|
if err != nil {
|
|
return nil, &OpErr{Err: err, Name: name, Op: "get"}
|
|
}
|
|
s.setNamed(v, "")
|
|
return v, nil
|
|
}
|
|
|
|
// getVolume requests the volume, if the driver info is stored it just accesses that driver,
|
|
// if the driver is unknown it probes all drivers until it finds the first volume with that name.
|
|
// it is expected that callers of this function hold any necessary locks
|
|
func (s *VolumeStore) getVolume(name string) (volume.Volume, error) {
|
|
labels := map[string]string{}
|
|
options := map[string]string{}
|
|
|
|
if s.db != nil {
|
|
// get meta
|
|
if err := s.db.Update(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket([]byte(volumeBucketName))
|
|
data := b.Get([]byte(name))
|
|
|
|
if string(data) == "" {
|
|
return nil
|
|
}
|
|
|
|
var meta volumeMetadata
|
|
buf := bytes.NewBuffer(data)
|
|
|
|
if err := json.NewDecoder(buf).Decode(&meta); err != nil {
|
|
return err
|
|
}
|
|
labels = meta.Labels
|
|
options = meta.Options
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
logrus.Debugf("Getting volume reference for name: %s", name)
|
|
s.globalLock.Lock()
|
|
v, exists := s.names[name]
|
|
s.globalLock.Unlock()
|
|
if exists {
|
|
vd, err := volumedrivers.GetDriver(v.DriverName())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
vol, err := vd.Get(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return volumeWrapper{vol, labels, vd.Scope(), options}, nil
|
|
}
|
|
|
|
logrus.Debugf("Probing all drivers for volume with name: %s", name)
|
|
drivers, err := volumedrivers.GetAllDrivers()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, d := range drivers {
|
|
v, err := d.Get(name)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
return volumeWrapper{v, labels, d.Scope(), options}, nil
|
|
}
|
|
return nil, errNoSuchVolume
|
|
}
|
|
|
|
// Remove removes the requested volume. A volume is not removed if it has any refs
|
|
func (s *VolumeStore) Remove(v volume.Volume) error {
|
|
name := normaliseVolumeName(v.Name())
|
|
s.locks.Lock(name)
|
|
defer s.locks.Unlock(name)
|
|
|
|
refs := s.getRefs(name)
|
|
if len(refs) > 0 {
|
|
return &OpErr{Err: errVolumeInUse, Name: v.Name(), Op: "remove", Refs: refs}
|
|
}
|
|
|
|
vd, err := volumedrivers.RemoveDriver(v.DriverName())
|
|
if err != nil {
|
|
return &OpErr{Err: err, Name: vd.Name(), Op: "remove"}
|
|
}
|
|
|
|
logrus.Debugf("Removing volume reference: driver %s, name %s", v.DriverName(), name)
|
|
vol := unwrapVolume(v)
|
|
if err := vd.Remove(vol); err != nil {
|
|
return &OpErr{Err: err, Name: name, Op: "remove"}
|
|
}
|
|
|
|
s.Purge(name)
|
|
return nil
|
|
}
|
|
|
|
// Dereference removes the specified reference to the volume
|
|
func (s *VolumeStore) Dereference(v volume.Volume, ref string) {
|
|
s.locks.Lock(v.Name())
|
|
defer s.locks.Unlock(v.Name())
|
|
|
|
s.globalLock.Lock()
|
|
defer s.globalLock.Unlock()
|
|
var refs []string
|
|
|
|
for _, r := range s.refs[v.Name()] {
|
|
if r != ref {
|
|
refs = append(refs, r)
|
|
}
|
|
}
|
|
s.refs[v.Name()] = refs
|
|
}
|
|
|
|
// Refs gets the current list of refs for the given volume
|
|
func (s *VolumeStore) Refs(v volume.Volume) []string {
|
|
s.locks.Lock(v.Name())
|
|
defer s.locks.Unlock(v.Name())
|
|
|
|
refs := s.getRefs(v.Name())
|
|
refsOut := make([]string, len(refs))
|
|
copy(refsOut, refs)
|
|
return refsOut
|
|
}
|
|
|
|
// FilterByDriver returns the available volumes filtered by driver name
|
|
func (s *VolumeStore) FilterByDriver(name string) ([]volume.Volume, error) {
|
|
vd, err := volumedrivers.GetDriver(name)
|
|
if err != nil {
|
|
return nil, &OpErr{Err: err, Name: name, Op: "list"}
|
|
}
|
|
ls, err := vd.List()
|
|
if err != nil {
|
|
return nil, &OpErr{Err: err, Name: name, Op: "list"}
|
|
}
|
|
for i, v := range ls {
|
|
options := map[string]string{}
|
|
for key, value := range s.options[v.Name()] {
|
|
options[key] = value
|
|
}
|
|
ls[i] = volumeWrapper{v, s.labels[v.Name()], vd.Scope(), options}
|
|
}
|
|
return ls, nil
|
|
}
|
|
|
|
// FilterByUsed returns the available volumes filtered by if they are in use or not.
|
|
// `used=true` returns only volumes that are being used, while `used=false` returns
|
|
// only volumes that are not being used.
|
|
func (s *VolumeStore) FilterByUsed(vols []volume.Volume, used bool) []volume.Volume {
|
|
return s.filter(vols, func(v volume.Volume) bool {
|
|
s.locks.Lock(v.Name())
|
|
l := len(s.getRefs(v.Name()))
|
|
s.locks.Unlock(v.Name())
|
|
if (used && l > 0) || (!used && l == 0) {
|
|
return true
|
|
}
|
|
return false
|
|
})
|
|
}
|
|
|
|
// filterFunc defines a function to allow filter volumes in the store
|
|
type filterFunc func(vol volume.Volume) bool
|
|
|
|
// filter returns the available volumes filtered by a filterFunc function
|
|
func (s *VolumeStore) filter(vols []volume.Volume, f filterFunc) []volume.Volume {
|
|
var ls []volume.Volume
|
|
for _, v := range vols {
|
|
if f(v) {
|
|
ls = append(ls, v)
|
|
}
|
|
}
|
|
return ls
|
|
}
|
|
|
|
func unwrapVolume(v volume.Volume) volume.Volume {
|
|
if vol, ok := v.(volumeWrapper); ok {
|
|
return vol.Volume
|
|
}
|
|
|
|
return v
|
|
}
|