mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Share libkv store handles across datastore handles
Currently every `NewDatastore` creates a brand new libkv store handle. This change attempts to share the libkv store handle across various datastore handles which share the same scope configuration. This enables libnetwork and drivers to have different datastore handle based on the same configuration but share the same underlying libkv store handle. This is mandatory for boltdb libkv backend because no two clients can get exclusive access to boltdb file at the same time. For other backends it just avoids the overhead of having too many backend client instances Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
This commit is contained in:
parent
c74538c22e
commit
3f7e26160e
2 changed files with 90 additions and 21 deletions
|
@ -5,6 +5,7 @@ import (
|
|||
"log"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/docker/libkv"
|
||||
|
@ -56,6 +57,7 @@ type datastore struct {
|
|||
scope string
|
||||
store store.Store
|
||||
cache *cache
|
||||
cfg ScopeCfg
|
||||
}
|
||||
|
||||
// KVObject is Key/Value interface used by objects to be part of the DataStore
|
||||
|
@ -104,6 +106,12 @@ type ScopeClientCfg struct {
|
|||
Config *store.Config
|
||||
}
|
||||
|
||||
type storeTableData struct {
|
||||
refCnt int
|
||||
store store.Store
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
const (
|
||||
// LocalScope indicates to store the KV object in local datastore such as boltdb
|
||||
LocalScope = "local"
|
||||
|
@ -121,6 +129,8 @@ const (
|
|||
|
||||
var (
|
||||
defaultScopes = makeDefaultScopes()
|
||||
storeLock sync.Mutex
|
||||
storeTable = make(map[ScopeCfg]*storeTableData)
|
||||
)
|
||||
|
||||
func makeDefaultScopes() map[string]*ScopeCfg {
|
||||
|
@ -190,11 +200,7 @@ func ParseKey(key string) ([]string, error) {
|
|||
}
|
||||
|
||||
// newClient used to connect to KV Store
|
||||
func newClient(scope string, kv string, addrs string, config *store.Config, cached bool) (DataStore, error) {
|
||||
if cached && scope != LocalScope {
|
||||
return nil, fmt.Errorf("caching supported only for scope %s", LocalScope)
|
||||
}
|
||||
|
||||
func newClient(scope string, kv string, addrs string, config *store.Config, cached bool) (*datastore, error) {
|
||||
if config == nil {
|
||||
config = &store.Config{}
|
||||
}
|
||||
|
@ -213,24 +219,76 @@ func newClient(scope string, kv string, addrs string, config *store.Config, cach
|
|||
|
||||
// NewDataStore creates a new instance of LibKV data store
|
||||
func NewDataStore(scope string, cfg *ScopeCfg) (DataStore, error) {
|
||||
if cfg == nil || cfg.Client.Provider == "" || cfg.Client.Address == "" {
|
||||
c, ok := defaultScopes[scope]
|
||||
if !ok || c.Client.Provider == "" || c.Client.Address == "" {
|
||||
return nil, fmt.Errorf("unexpected scope %s without configuration passed", scope)
|
||||
var (
|
||||
err error
|
||||
ds *datastore
|
||||
)
|
||||
|
||||
if !cfg.IsValid() {
|
||||
return nil, fmt.Errorf("invalid datastore configuration passed for scope %s", scope)
|
||||
}
|
||||
|
||||
storeLock.Lock()
|
||||
sdata, ok := storeTable[*cfg]
|
||||
if ok {
|
||||
sdata.refCnt++
|
||||
// If sdata already has a store nothing to do. Just
|
||||
// create a datastore handle using it and return with
|
||||
// that.
|
||||
if sdata.store != nil {
|
||||
storeLock.Unlock()
|
||||
return &datastore{scope: scope, cfg: *cfg, store: sdata.store}, nil
|
||||
}
|
||||
} else {
|
||||
// If sdata is not present create one and add ito
|
||||
// storeTable while holding the lock.
|
||||
sdata = &storeTableData{refCnt: 1}
|
||||
storeTable[*cfg] = sdata
|
||||
}
|
||||
storeLock.Unlock()
|
||||
|
||||
// We come here either because:
|
||||
//
|
||||
// 1. We just created the store table data OR
|
||||
// 2. We picked up the store table data from table but store was not initialized.
|
||||
//
|
||||
// In both cases the once function will ensure the store
|
||||
// initialization happens exactly once
|
||||
sdata.once.Do(func() {
|
||||
ds, err = newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, scope == LocalScope)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
cfg = c
|
||||
ds.cfg = *cfg
|
||||
sdata.store = ds.store
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var cached bool
|
||||
if scope == LocalScope {
|
||||
cached = true
|
||||
}
|
||||
|
||||
return newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, cached)
|
||||
return ds, nil
|
||||
}
|
||||
|
||||
func (ds *datastore) Close() {
|
||||
storeLock.Lock()
|
||||
sdata := storeTable[ds.cfg]
|
||||
|
||||
if sdata == nil {
|
||||
storeLock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
sdata.refCnt--
|
||||
if sdata.refCnt > 0 {
|
||||
storeLock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
delete(storeTable, ds.cfg)
|
||||
storeLock.Unlock()
|
||||
|
||||
ds.store.Close()
|
||||
}
|
||||
|
||||
|
|
|
@ -43,9 +43,17 @@ func TestBoltdbBackend(t *testing.T) {
|
|||
|
||||
func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Config) {
|
||||
cfgOptions := []config.Option{}
|
||||
cfgOptions = append(cfgOptions, config.OptionLocalKVProvider(provider))
|
||||
cfgOptions = append(cfgOptions, config.OptionLocalKVProviderURL(url))
|
||||
cfgOptions = append(cfgOptions, config.OptionLocalKVProviderConfig(storeConfig))
|
||||
if provider != "" {
|
||||
cfgOptions = append(cfgOptions, config.OptionLocalKVProvider(provider))
|
||||
}
|
||||
|
||||
if url != "" {
|
||||
cfgOptions = append(cfgOptions, config.OptionLocalKVProviderURL(url))
|
||||
}
|
||||
|
||||
if storeConfig != nil {
|
||||
cfgOptions = append(cfgOptions, config.OptionLocalKVProviderConfig(storeConfig))
|
||||
}
|
||||
|
||||
driverOptions := options.Generic{}
|
||||
genericOption := make(map[string]interface{})
|
||||
|
@ -71,7 +79,7 @@ func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Con
|
|||
if exists, err := store.Exists(datastore.Key([]string{datastore.EndpointKeyPrefix, string(nw.ID()), string(ep.ID())}...)); exists || err != nil {
|
||||
t.Fatalf("Endpoint key shouldn't have been created.")
|
||||
}
|
||||
store.Close()
|
||||
ctrl.(*controller).getStore(datastore.LocalScope).Close()
|
||||
|
||||
// test restore of local store
|
||||
ctrl, err = New(cfgOptions...)
|
||||
|
@ -138,7 +146,10 @@ func TestLocalStoreLockTimeout(t *testing.T) {
|
|||
}
|
||||
defer ctrl1.Stop()
|
||||
// Use the same boltdb file without closing the previous controller
|
||||
_, err = New(cfgOptions...)
|
||||
// with a slightly altered configuration
|
||||
sCfg := &store.Config{Bucket: "testBackend", ConnectionTimeout: 1 * time.Second}
|
||||
_, err = New(append(cfgOptions[:len(cfgOptions)-1],
|
||||
config.OptionLocalKVProviderConfig(sCfg))...)
|
||||
if err == nil {
|
||||
t.Fatalf("Expected to fail but succeeded")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue