mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
1272f90eae
Since the datastore interface is common for persistent and non-persistent objects we need to provide the same kind of sequencing and atomicity guarantess to non-persistent data operations as we do for persistent operations. So added sequencing and atomicity checks in the data cache layer. Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
177 lines
3.4 KiB
Go
177 lines
3.4 KiB
Go
package datastore
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/docker/libkv/store"
|
|
)
|
|
|
|
type kvMap map[string]KVObject
|
|
|
|
type cache struct {
|
|
sync.Mutex
|
|
kmm map[string]kvMap
|
|
ds *datastore
|
|
}
|
|
|
|
func newCache(ds *datastore) *cache {
|
|
return &cache{kmm: make(map[string]kvMap), ds: ds}
|
|
}
|
|
|
|
func (c *cache) kmap(kvObject KVObject) (kvMap, error) {
|
|
var err error
|
|
|
|
c.Lock()
|
|
keyPrefix := Key(kvObject.KeyPrefix()...)
|
|
kmap, ok := c.kmm[keyPrefix]
|
|
c.Unlock()
|
|
|
|
if ok {
|
|
return kmap, nil
|
|
}
|
|
|
|
kmap = kvMap{}
|
|
|
|
// Bail out right away if the kvObject does not implement KVConstructor
|
|
ctor, ok := kvObject.(KVConstructor)
|
|
if !ok {
|
|
return nil, fmt.Errorf("error while populating kmap, object does not implement KVConstructor interface")
|
|
}
|
|
|
|
kvList, err := c.ds.store.List(keyPrefix)
|
|
if err != nil {
|
|
if err == store.ErrKeyNotFound {
|
|
// If the store doesn't have anything then there is nothing to
|
|
// populate in the cache. Just bail out.
|
|
goto out
|
|
}
|
|
|
|
return nil, fmt.Errorf("error while populating kmap: %v", err)
|
|
}
|
|
|
|
for _, kvPair := range kvList {
|
|
// Ignore empty kvPair values
|
|
if len(kvPair.Value) == 0 {
|
|
continue
|
|
}
|
|
|
|
dstO := ctor.New()
|
|
err = dstO.SetValue(kvPair.Value)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Make sure the object has a correct view of the DB index in
|
|
// case we need to modify it and update the DB.
|
|
dstO.SetIndex(kvPair.LastIndex)
|
|
|
|
kmap[Key(dstO.Key()...)] = dstO
|
|
}
|
|
|
|
out:
|
|
// There may multiple go routines racing to fill the
|
|
// cache. The one which places the kmap in c.kmm first
|
|
// wins. The others should just use what the first populated.
|
|
c.Lock()
|
|
kmapNew, ok := c.kmm[keyPrefix]
|
|
if ok {
|
|
c.Unlock()
|
|
return kmapNew, nil
|
|
}
|
|
|
|
c.kmm[keyPrefix] = kmap
|
|
c.Unlock()
|
|
|
|
return kmap, nil
|
|
}
|
|
|
|
func (c *cache) add(kvObject KVObject, atomic bool) error {
|
|
kmap, err := c.kmap(kvObject)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.Lock()
|
|
// If atomic is true, cache needs to maintain its own index
|
|
// for atomicity and the add needs to be atomic.
|
|
if atomic {
|
|
if prev, ok := kmap[Key(kvObject.Key()...)]; ok {
|
|
if prev.Index() != kvObject.Index() {
|
|
c.Unlock()
|
|
return ErrKeyModified
|
|
}
|
|
}
|
|
|
|
// Increment index
|
|
index := kvObject.Index()
|
|
index++
|
|
kvObject.SetIndex(index)
|
|
}
|
|
|
|
kmap[Key(kvObject.Key()...)] = kvObject
|
|
c.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (c *cache) del(kvObject KVObject, atomic bool) error {
|
|
kmap, err := c.kmap(kvObject)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.Lock()
|
|
// If atomic is true, cache needs to maintain its own index
|
|
// for atomicity and del needs to be atomic.
|
|
if atomic {
|
|
if prev, ok := kmap[Key(kvObject.Key()...)]; ok {
|
|
if prev.Index() != kvObject.Index() {
|
|
c.Unlock()
|
|
return ErrKeyModified
|
|
}
|
|
}
|
|
}
|
|
|
|
delete(kmap, Key(kvObject.Key()...))
|
|
c.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (c *cache) get(key string, kvObject KVObject) error {
|
|
kmap, err := c.kmap(kvObject)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
o, ok := kmap[Key(kvObject.Key()...)]
|
|
if !ok {
|
|
return ErrKeyNotFound
|
|
}
|
|
|
|
ctor, ok := o.(KVConstructor)
|
|
if !ok {
|
|
return fmt.Errorf("kvobject does not implement KVConstructor interface. could not get object")
|
|
}
|
|
|
|
return ctor.CopyTo(kvObject)
|
|
}
|
|
|
|
func (c *cache) list(kvObject KVObject) ([]KVObject, error) {
|
|
kmap, err := c.kmap(kvObject)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
var kvol []KVObject
|
|
for _, v := range kmap {
|
|
kvol = append(kvol, v)
|
|
}
|
|
|
|
return kvol, nil
|
|
}
|