mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Add local datastore to persist states of LocalScope network
Signed-off-by: Chun Chen <ramichen@tencent.com>
This commit is contained in:
parent
56e3c1e9d5
commit
8babc3d4d3
20 changed files with 266 additions and 152 deletions
|
@ -70,6 +70,11 @@ func (h *Handle) Exists() bool {
|
|||
return h.dbExists
|
||||
}
|
||||
|
||||
// DataScope method returns the storage scope of the datastore
|
||||
func (h *Handle) DataScope() datastore.DataScope {
|
||||
return datastore.GlobalScope
|
||||
}
|
||||
|
||||
func (h *Handle) watchForChanges() error {
|
||||
h.Lock()
|
||||
store := h.store
|
||||
|
|
|
@ -83,11 +83,11 @@ func processConfig(cfg *config.Config) []config.Option {
|
|||
if cfg.Daemon.Labels != nil {
|
||||
options = append(options, config.OptionLabels(cfg.Daemon.Labels))
|
||||
}
|
||||
if strings.TrimSpace(cfg.Datastore.Client.Provider) != "" {
|
||||
options = append(options, config.OptionKVProvider(cfg.Datastore.Client.Provider))
|
||||
if strings.TrimSpace(cfg.GlobalStore.Client.Provider) != "" {
|
||||
options = append(options, config.OptionKVProvider(cfg.GlobalStore.Client.Provider))
|
||||
}
|
||||
if strings.TrimSpace(cfg.Datastore.Client.Address) != "" {
|
||||
options = append(options, config.OptionKVProviderURL(cfg.Datastore.Client.Address))
|
||||
if strings.TrimSpace(cfg.GlobalStore.Client.Address) != "" {
|
||||
options = append(options, config.OptionKVProviderURL(cfg.GlobalStore.Client.Address))
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
|
|
@ -5,14 +5,15 @@ import (
|
|||
|
||||
"github.com/BurntSushi/toml"
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/docker/libkv/store"
|
||||
"github.com/docker/libnetwork/netlabel"
|
||||
)
|
||||
|
||||
// Config encapsulates configurations of various Libnetwork components
|
||||
type Config struct {
|
||||
Daemon DaemonCfg
|
||||
Cluster ClusterCfg
|
||||
Datastore DatastoreCfg
|
||||
Daemon DaemonCfg
|
||||
Cluster ClusterCfg
|
||||
GlobalStore, LocalStore DatastoreCfg
|
||||
}
|
||||
|
||||
// DaemonCfg represents libnetwork core configuration
|
||||
|
@ -41,6 +42,7 @@ type DatastoreCfg struct {
|
|||
type DatastoreClientCfg struct {
|
||||
Provider string
|
||||
Address string
|
||||
Config *store.Config
|
||||
}
|
||||
|
||||
// ParseConfig parses the libnetwork configuration file
|
||||
|
@ -94,7 +96,7 @@ func OptionLabels(labels []string) Option {
|
|||
func OptionKVProvider(provider string) Option {
|
||||
return func(c *Config) {
|
||||
log.Infof("Option OptionKVProvider: %s", provider)
|
||||
c.Datastore.Client.Provider = strings.TrimSpace(provider)
|
||||
c.GlobalStore.Client.Provider = strings.TrimSpace(provider)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -102,7 +104,7 @@ func OptionKVProvider(provider string) Option {
|
|||
func OptionKVProviderURL(url string) Option {
|
||||
return func(c *Config) {
|
||||
log.Infof("Option OptionKVProviderURL: %s", url)
|
||||
c.Datastore.Client.Address = strings.TrimSpace(url)
|
||||
c.GlobalStore.Client.Address = strings.TrimSpace(url)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -122,3 +124,27 @@ func IsValidName(name string) bool {
|
|||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// OptionLocalKVProvider function returns an option setter for kvstore provider
|
||||
func OptionLocalKVProvider(provider string) Option {
|
||||
return func(c *Config) {
|
||||
log.Infof("Option OptionLocalKVProvider: %s", provider)
|
||||
c.LocalStore.Client.Provider = strings.TrimSpace(provider)
|
||||
}
|
||||
}
|
||||
|
||||
// OptionLocalKVProviderURL function returns an option setter for kvstore url
|
||||
func OptionLocalKVProviderURL(url string) Option {
|
||||
return func(c *Config) {
|
||||
log.Infof("Option OptionLocalKVProviderURL: %s", url)
|
||||
c.LocalStore.Client.Address = strings.TrimSpace(url)
|
||||
}
|
||||
}
|
||||
|
||||
// OptionLocalKVProviderConfig function returns an option setter for kvstore config
|
||||
func OptionLocalKVProviderConfig(config *store.Config) Option {
|
||||
return func(c *Config) {
|
||||
log.Infof("Option OptionLocalKVProviderConfig: %v", config)
|
||||
c.LocalStore.Client.Config = config
|
||||
}
|
||||
}
|
||||
|
|
|
@ -120,13 +120,13 @@ type endpointTable map[string]*endpoint
|
|||
type sandboxTable map[string]*sandbox
|
||||
|
||||
type controller struct {
|
||||
id string
|
||||
networks networkTable
|
||||
drivers driverTable
|
||||
sandboxes sandboxTable
|
||||
cfg *config.Config
|
||||
store datastore.DataStore
|
||||
extKeyListener net.Listener
|
||||
id string
|
||||
networks networkTable
|
||||
drivers driverTable
|
||||
sandboxes sandboxTable
|
||||
cfg *config.Config
|
||||
globalStore, localStore datastore.DataStore
|
||||
extKeyListener net.Listener
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
|
@ -152,7 +152,7 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
|
|||
}
|
||||
|
||||
if cfg != nil {
|
||||
if err := c.initDataStore(); err != nil {
|
||||
if err := c.initGlobalStore(); err != nil {
|
||||
// Failing to initalize datastore is a bad situation to be in.
|
||||
// But it cannot fail creating the Controller
|
||||
log.Debugf("Failed to Initialize Datastore due to %v. Operating in non-clustered mode", err)
|
||||
|
@ -162,6 +162,9 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
|
|||
// But it cannot fail creating the Controller
|
||||
log.Debugf("Failed to Initialize Discovery : %v", err)
|
||||
}
|
||||
if err := c.initLocalStore(); err != nil {
|
||||
return nil, fmt.Errorf("Failed to Initialize LocalDatastore due to %v.", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.startExternalKeyListener(); err != nil {
|
||||
|
@ -282,6 +285,7 @@ func (c *controller) addNetwork(n *network) error {
|
|||
n.Lock()
|
||||
n.svcRecords = svcMap{}
|
||||
n.driver = dd.driver
|
||||
n.dataScope = dd.capability.DataScope
|
||||
d := n.driver
|
||||
n.Unlock()
|
||||
|
||||
|
@ -480,19 +484,6 @@ func (c *controller) loadDriver(networkType string) (*driverData, error) {
|
|||
return dd, nil
|
||||
}
|
||||
|
||||
func (c *controller) isDriverGlobalScoped(networkType string) (bool, error) {
|
||||
c.Lock()
|
||||
dd, ok := c.drivers[networkType]
|
||||
c.Unlock()
|
||||
if !ok {
|
||||
return false, types.NotFoundErrorf("driver not found for %s", networkType)
|
||||
}
|
||||
if dd.capability.Scope == driverapi.GlobalScope {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (c *controller) Stop() {
|
||||
c.stopExternalKeyListener()
|
||||
osl.GC()
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"github.com/docker/libkv"
|
||||
"github.com/docker/libkv/store"
|
||||
"github.com/docker/libkv/store/boltdb"
|
||||
"github.com/docker/libkv/store/consul"
|
||||
"github.com/docker/libkv/store/etcd"
|
||||
"github.com/docker/libkv/store/zookeeper"
|
||||
|
@ -58,8 +59,20 @@ type KV interface {
|
|||
// True if the object exists in the datastore, false if it hasn't been stored yet.
|
||||
// When SetIndex() is called, the object has been stored.
|
||||
Exists() bool
|
||||
// DataScope indicates the storage scope of the KV object
|
||||
DataScope() DataScope
|
||||
}
|
||||
|
||||
// DataScope indicates the storage scope
|
||||
type DataScope int
|
||||
|
||||
const (
|
||||
// LocalScope indicates to store the KV object in local datastore such as boltdb
|
||||
LocalScope DataScope = iota
|
||||
// GlobalScope indicates to store the KV object in global datastore such as consul/etcd/zookeeper
|
||||
GlobalScope
|
||||
)
|
||||
|
||||
const (
|
||||
// NetworkKeyPrefix is the prefix for network key in the kv store
|
||||
NetworkKeyPrefix = "network"
|
||||
|
@ -73,6 +86,7 @@ func init() {
|
|||
consul.Register()
|
||||
zookeeper.Register()
|
||||
etcd.Register()
|
||||
boltdb.Register()
|
||||
}
|
||||
|
||||
//Key provides convenient method to create a Key
|
||||
|
@ -94,8 +108,11 @@ func ParseKey(key string) ([]string, error) {
|
|||
}
|
||||
|
||||
// newClient used to connect to KV Store
|
||||
func newClient(kv string, addrs string) (DataStore, error) {
|
||||
store, err := libkv.NewStore(store.Backend(kv), []string{addrs}, &store.Config{})
|
||||
func newClient(kv string, addrs string, config *store.Config) (DataStore, error) {
|
||||
if config == nil {
|
||||
config = &store.Config{}
|
||||
}
|
||||
store, err := libkv.NewStore(store.Backend(kv), []string{addrs}, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -109,7 +126,7 @@ func NewDataStore(cfg *config.DatastoreCfg) (DataStore, error) {
|
|||
return nil, types.BadRequestErrorf("invalid configuration passed to datastore")
|
||||
}
|
||||
// TODO : cfg.Embedded case
|
||||
return newClient(cfg.Client.Provider, cfg.Client.Address)
|
||||
return newClient(cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config)
|
||||
}
|
||||
|
||||
// NewCustomDataStore can be used by clients to plugin cusom datatore that adhers to store.Store
|
||||
|
|
|
@ -162,6 +162,10 @@ func (n *dummyObject) Exists() bool {
|
|||
return n.DBExists
|
||||
}
|
||||
|
||||
func (n *dummyObject) DataScope() DataScope {
|
||||
return LocalScope
|
||||
}
|
||||
|
||||
func (n *dummyObject) MarshalJSON() ([]byte, error) {
|
||||
netMap := make(map[string]interface{})
|
||||
netMap["name"] = n.Name
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
package driverapi
|
||||
|
||||
import "net"
|
||||
import (
|
||||
"net"
|
||||
|
||||
"github.com/docker/libnetwork/datastore"
|
||||
)
|
||||
|
||||
// NetworkPluginEndpointType represents the Endpoint Type used by Plugin system
|
||||
const NetworkPluginEndpointType = "NetworkDriver"
|
||||
|
@ -98,17 +102,7 @@ type DriverCallback interface {
|
|||
RegisterDriver(name string, driver Driver, capability Capability) error
|
||||
}
|
||||
|
||||
// Scope indicates the drivers scope capability
|
||||
type Scope int
|
||||
|
||||
const (
|
||||
// LocalScope represents the driver capable of providing networking services for containers in a single host
|
||||
LocalScope Scope = iota
|
||||
// GlobalScope represents the driver capable of providing networking services for containers across hosts
|
||||
GlobalScope
|
||||
)
|
||||
|
||||
// Capability represents the high level capabilities of the drivers which libnetwork can make use of
|
||||
type Capability struct {
|
||||
Scope Scope
|
||||
DataScope datastore.DataScope
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"syscall"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/libnetwork/datastore"
|
||||
"github.com/docker/libnetwork/driverapi"
|
||||
"github.com/docker/libnetwork/ipallocator"
|
||||
"github.com/docker/libnetwork/iptables"
|
||||
|
@ -133,7 +134,7 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
|
|||
}
|
||||
|
||||
c := driverapi.Capability{
|
||||
Scope: driverapi.LocalScope,
|
||||
DataScope: datastore.LocalScope,
|
||||
}
|
||||
return dc.RegisterDriver(networkType, d, c)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package host
|
|||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/docker/libnetwork/datastore"
|
||||
"github.com/docker/libnetwork/driverapi"
|
||||
"github.com/docker/libnetwork/types"
|
||||
)
|
||||
|
@ -17,7 +18,7 @@ type driver struct {
|
|||
// Init registers a new instance of host driver
|
||||
func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
|
||||
c := driverapi.Capability{
|
||||
Scope: driverapi.LocalScope,
|
||||
DataScope: datastore.LocalScope,
|
||||
}
|
||||
return dc.RegisterDriver(networkType, &driver{}, c)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package null
|
|||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/docker/libnetwork/datastore"
|
||||
"github.com/docker/libnetwork/driverapi"
|
||||
"github.com/docker/libnetwork/types"
|
||||
)
|
||||
|
@ -17,7 +18,7 @@ type driver struct {
|
|||
// Init registers a new instance of null driver
|
||||
func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
|
||||
c := driverapi.Capability{
|
||||
Scope: driverapi.LocalScope,
|
||||
DataScope: datastore.LocalScope,
|
||||
}
|
||||
return dc.RegisterDriver(networkType, &driver{}, c)
|
||||
}
|
||||
|
|
|
@ -306,6 +306,10 @@ func (n *network) SetValue(value []byte) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (n *network) DataScope() datastore.DataScope {
|
||||
return datastore.GlobalScope
|
||||
}
|
||||
|
||||
func (n *network) writeToStore() error {
|
||||
return n.driver.store.PutObjectAtomic(n)
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/docker/libkv/store"
|
||||
"github.com/docker/libnetwork/config"
|
||||
"github.com/docker/libnetwork/datastore"
|
||||
"github.com/docker/libnetwork/driverapi"
|
||||
|
@ -71,7 +72,7 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
|
|||
once.Do(onceInit)
|
||||
|
||||
c := driverapi.Capability{
|
||||
Scope: driverapi.GlobalScope,
|
||||
DataScope: datastore.GlobalScope,
|
||||
}
|
||||
|
||||
d := &driver{
|
||||
|
@ -130,6 +131,10 @@ func (d *driver) configure(option map[string]interface{}) error {
|
|||
Address: provURL.(string),
|
||||
},
|
||||
}
|
||||
provConfig, confOk := option[netlabel.KVProviderConfig]
|
||||
if confOk {
|
||||
cfg.Client.Config = provConfig.(*store.Config)
|
||||
}
|
||||
d.store, err = datastore.NewDataStore(cfg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to initialize data store: %v", err)
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/docker/docker/pkg/plugins"
|
||||
"github.com/docker/libnetwork/datastore"
|
||||
"github.com/docker/libnetwork/driverapi"
|
||||
"github.com/docker/libnetwork/drivers/remote/api"
|
||||
"github.com/docker/libnetwork/types"
|
||||
|
@ -52,9 +53,9 @@ func (d *driver) getCapabilities() (*driverapi.Capability, error) {
|
|||
c := &driverapi.Capability{}
|
||||
switch capResp.Scope {
|
||||
case "global":
|
||||
c.Scope = driverapi.GlobalScope
|
||||
c.DataScope = datastore.GlobalScope
|
||||
case "local":
|
||||
c.Scope = driverapi.LocalScope
|
||||
c.DataScope = datastore.LocalScope
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid capability: expecting 'local' or 'global', got %s", capResp.Scope)
|
||||
}
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
package windows
|
||||
|
||||
import "github.com/docker/libnetwork/driverapi"
|
||||
import (
|
||||
"github.com/docker/libnetwork/datastore"
|
||||
"github.com/docker/libnetwork/driverapi"
|
||||
)
|
||||
|
||||
const networkType = "windows"
|
||||
|
||||
|
@ -11,7 +14,7 @@ type driver struct{}
|
|||
// Init registers a new instance of null driver
|
||||
func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
|
||||
c := driverapi.Capability{
|
||||
Scope: driverapi.LocalScope,
|
||||
DataScope: datastore.LocalScope,
|
||||
}
|
||||
return dc.RegisterDriver(networkType, &driver{}, c)
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
|
@ -130,15 +131,15 @@ func (ep *endpoint) KeyPrefix() []string {
|
|||
return []string{datastore.EndpointKeyPrefix, ep.getNetwork().id}
|
||||
}
|
||||
|
||||
func (ep *endpoint) networkIDFromKey(key []string) (string, error) {
|
||||
// endpoint Key structure : endpoint/network-id/endpoint-id
|
||||
// it's an invalid key if the key doesn't have all the 3 key elements above
|
||||
if key == nil || len(key) < 3 || key[0] != datastore.EndpointKeyPrefix {
|
||||
func (ep *endpoint) networkIDFromKey(key string) (string, error) {
|
||||
// endpoint Key structure : docker/libnetwork/endpoint/${network-id}/${endpoint-id}
|
||||
// it's an invalid key if the key doesn't have all the 5 key elements above
|
||||
keyElements := strings.Split(key, "/")
|
||||
if !strings.HasPrefix(key, datastore.Key(datastore.EndpointKeyPrefix)) || len(keyElements) < 5 {
|
||||
return "", fmt.Errorf("invalid endpoint key : %v", key)
|
||||
}
|
||||
|
||||
// network-id is placed at index=1. pls refer to endpoint.Key() method
|
||||
return key[1], nil
|
||||
// network-id is placed at index=3. pls refer to endpoint.Key() method
|
||||
return strings.Split(key, "/")[3], nil
|
||||
}
|
||||
|
||||
func (ep *endpoint) Value() []byte {
|
||||
|
@ -540,3 +541,9 @@ func JoinOptionPriority(ep Endpoint, prio int) EndpointOption {
|
|||
sb.epPriority[ep.id] = prio
|
||||
}
|
||||
}
|
||||
|
||||
func (ep *endpoint) DataScope() datastore.DataScope {
|
||||
ep.Lock()
|
||||
defer ep.Unlock()
|
||||
return ep.network.dataScope
|
||||
}
|
||||
|
|
|
@ -176,3 +176,8 @@ func (a *Allocator) deleteFromStore() error {
|
|||
}
|
||||
return store.DeleteObjectAtomic(a)
|
||||
}
|
||||
|
||||
// DataScope method returns the storage scope of the datastore
|
||||
func (a *Allocator) DataScope() datastore.DataScope {
|
||||
return datastore.GlobalScope
|
||||
}
|
||||
|
|
|
@ -28,5 +28,5 @@ func TestDriverRegistration(t *testing.T) {
|
|||
|
||||
func SetTestDataStore(c NetworkController, custom datastore.DataStore) {
|
||||
con := c.(*controller)
|
||||
con.store = custom
|
||||
con.globalStore = custom
|
||||
}
|
||||
|
|
|
@ -30,6 +30,9 @@ const (
|
|||
// KVProviderURL constant represents the KV provider URL
|
||||
KVProviderURL = DriverPrefix + ".kv_provider_url"
|
||||
|
||||
// KVProviderConfig constant represents the KV provider Config
|
||||
KVProviderConfig = DriverPrefix + ".kv_provider_config"
|
||||
|
||||
// OverlayBindInterface constant represents overlay driver bind interface
|
||||
OverlayBindInterface = DriverPrefix + ".overlay.bind_interface"
|
||||
|
||||
|
|
|
@ -69,6 +69,7 @@ type network struct {
|
|||
svcRecords svcMap
|
||||
dbExists bool
|
||||
stopWatchCh chan struct{}
|
||||
dataScope datastore.DataScope
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
|
@ -140,6 +141,12 @@ func (n *network) Exists() bool {
|
|||
return n.dbExists
|
||||
}
|
||||
|
||||
func (n *network) DataScope() datastore.DataScope {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
return n.dataScope
|
||||
}
|
||||
|
||||
func (n *network) EndpointCnt() uint64 {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
@ -398,11 +405,8 @@ func (n *network) EndpointByID(id string) (Endpoint, error) {
|
|||
return nil, ErrNoSuchEndpoint(id)
|
||||
}
|
||||
|
||||
func (n *network) isGlobalScoped() (bool, error) {
|
||||
n.Lock()
|
||||
c := n.ctrlr
|
||||
n.Unlock()
|
||||
return c.isDriverGlobalScoped(n.networkType)
|
||||
func (n *network) isGlobalScoped() bool {
|
||||
return n.DataScope() == datastore.GlobalScope
|
||||
}
|
||||
|
||||
func (n *network) updateSvcRecord(ep *endpoint, isAdd bool) {
|
||||
|
|
|
@ -6,41 +6,82 @@ import (
|
|||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/docker/libkv/store"
|
||||
"github.com/docker/libnetwork/config"
|
||||
"github.com/docker/libnetwork/datastore"
|
||||
)
|
||||
|
||||
func (c *controller) validateDatastoreConfig() bool {
|
||||
return c.cfg != nil && c.cfg.Datastore.Client.Provider != "" && c.cfg.Datastore.Client.Address != ""
|
||||
var (
|
||||
defaultLocalStoreConfig = config.DatastoreCfg{
|
||||
Embedded: true,
|
||||
Client: config.DatastoreClientCfg{
|
||||
Provider: "boltdb",
|
||||
Address: defaultPrefix + "/boltdb.db",
|
||||
Config: &store.Config{
|
||||
Bucket: "libnetwork",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func (c *controller) validateGlobalStoreConfig() bool {
|
||||
return c.cfg != nil && c.cfg.GlobalStore.Client.Provider != "" && c.cfg.GlobalStore.Client.Address != ""
|
||||
}
|
||||
|
||||
func (c *controller) initDataStore() error {
|
||||
func (c *controller) initGlobalStore() error {
|
||||
c.Lock()
|
||||
cfg := c.cfg
|
||||
c.Unlock()
|
||||
if !c.validateDatastoreConfig() {
|
||||
return fmt.Errorf("datastore initialization requires a valid configuration")
|
||||
if !c.validateGlobalStoreConfig() {
|
||||
return fmt.Errorf("globalstore initialization requires a valid configuration")
|
||||
}
|
||||
|
||||
store, err := datastore.NewDataStore(&cfg.Datastore)
|
||||
store, err := datastore.NewDataStore(&cfg.GlobalStore)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.Lock()
|
||||
c.store = store
|
||||
c.globalStore = store
|
||||
c.Unlock()
|
||||
|
||||
nws, err := c.getNetworksFromStore()
|
||||
nws, err := c.getNetworksFromGlobalStore()
|
||||
if err == nil {
|
||||
c.processNetworkUpdate(nws, nil)
|
||||
} else if err != datastore.ErrKeyNotFound {
|
||||
log.Warnf("failed to read networks from datastore during init : %v", err)
|
||||
log.Warnf("failed to read networks from globalstore during init : %v", err)
|
||||
}
|
||||
return c.watchNetworks()
|
||||
}
|
||||
|
||||
func (c *controller) getNetworksFromStore() ([]*store.KVPair, error) {
|
||||
func (c *controller) initLocalStore() error {
|
||||
c.Lock()
|
||||
cs := c.store
|
||||
cfg := c.cfg
|
||||
c.Unlock()
|
||||
localStore, err := datastore.NewDataStore(c.getLocalStoreConfig(cfg))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.Lock()
|
||||
c.localStore = localStore
|
||||
c.Unlock()
|
||||
|
||||
nws, err := c.getNetworksFromLocalStore()
|
||||
if err == nil {
|
||||
c.processNetworkUpdate(nws, nil)
|
||||
} else if err != datastore.ErrKeyNotFound {
|
||||
log.Warnf("failed to read networks from localstore during init : %v", err)
|
||||
}
|
||||
eps, err := c.getEndpointsFromLocalStore()
|
||||
if err == nil {
|
||||
c.processEndpointsUpdate(eps, nil)
|
||||
} else if err != datastore.ErrKeyNotFound {
|
||||
log.Warnf("failed to read endpoints from localstore during init : %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) getNetworksFromGlobalStore() ([]*store.KVPair, error) {
|
||||
c.Lock()
|
||||
cs := c.globalStore
|
||||
c.Unlock()
|
||||
return cs.KVStore().List(datastore.Key(datastore.NetworkKeyPrefix))
|
||||
}
|
||||
|
@ -55,13 +96,7 @@ func (c *controller) newNetworkFromStore(n *network) error {
|
|||
}
|
||||
|
||||
func (c *controller) updateNetworkToStore(n *network) error {
|
||||
global, err := n.isGlobalScoped()
|
||||
if err != nil || !global {
|
||||
return err
|
||||
}
|
||||
c.Lock()
|
||||
cs := c.store
|
||||
c.Unlock()
|
||||
cs := c.getDataStore(n.DataScope())
|
||||
if cs == nil {
|
||||
log.Debugf("datastore not initialized. Network %s is not added to the store", n.Name())
|
||||
return nil
|
||||
|
@ -71,13 +106,7 @@ func (c *controller) updateNetworkToStore(n *network) error {
|
|||
}
|
||||
|
||||
func (c *controller) deleteNetworkFromStore(n *network) error {
|
||||
global, err := n.isGlobalScoped()
|
||||
if err != nil || !global {
|
||||
return err
|
||||
}
|
||||
c.Lock()
|
||||
cs := c.store
|
||||
c.Unlock()
|
||||
cs := c.getDataStore(n.DataScope())
|
||||
if cs == nil {
|
||||
log.Debugf("datastore not initialized. Network %s is not deleted from datastore", n.Name())
|
||||
return nil
|
||||
|
@ -90,14 +119,6 @@ func (c *controller) deleteNetworkFromStore(n *network) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) getNetworkFromStore(nid string) (*network, error) {
|
||||
n := network{id: nid}
|
||||
if err := c.store.GetObject(datastore.Key(n.Key()...), &n); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &n, nil
|
||||
}
|
||||
|
||||
func (c *controller) newEndpointFromStore(key string, ep *endpoint) error {
|
||||
ep.Lock()
|
||||
n := ep.network
|
||||
|
@ -115,16 +136,9 @@ func (c *controller) newEndpointFromStore(key string, ep *endpoint) error {
|
|||
|
||||
func (c *controller) updateEndpointToStore(ep *endpoint) error {
|
||||
ep.Lock()
|
||||
n := ep.network
|
||||
name := ep.name
|
||||
ep.Unlock()
|
||||
global, err := n.isGlobalScoped()
|
||||
if err != nil || !global {
|
||||
return err
|
||||
}
|
||||
c.Lock()
|
||||
cs := c.store
|
||||
c.Unlock()
|
||||
cs := c.getDataStore(ep.DataScope())
|
||||
if cs == nil {
|
||||
log.Debugf("datastore not initialized. endpoint %s is not added to the store", name)
|
||||
return nil
|
||||
|
@ -133,26 +147,8 @@ func (c *controller) updateEndpointToStore(ep *endpoint) error {
|
|||
return cs.PutObjectAtomic(ep)
|
||||
}
|
||||
|
||||
func (c *controller) getEndpointFromStore(eid string) (*endpoint, error) {
|
||||
ep := endpoint{id: eid}
|
||||
if err := c.store.GetObject(datastore.Key(ep.Key()...), &ep); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ep, nil
|
||||
}
|
||||
|
||||
func (c *controller) deleteEndpointFromStore(ep *endpoint) error {
|
||||
ep.Lock()
|
||||
n := ep.network
|
||||
ep.Unlock()
|
||||
global, err := n.isGlobalScoped()
|
||||
if err != nil || !global {
|
||||
return err
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
cs := c.store
|
||||
c.Unlock()
|
||||
cs := c.getDataStore(ep.DataScope())
|
||||
if cs == nil {
|
||||
log.Debugf("datastore not initialized. endpoint %s is not deleted from datastore", ep.Name())
|
||||
return nil
|
||||
|
@ -166,12 +162,12 @@ func (c *controller) deleteEndpointFromStore(ep *endpoint) error {
|
|||
}
|
||||
|
||||
func (c *controller) watchNetworks() error {
|
||||
if !c.validateDatastoreConfig() {
|
||||
if !c.validateGlobalStoreConfig() {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
cs := c.store
|
||||
cs := c.globalStore
|
||||
c.Unlock()
|
||||
|
||||
networkKey := datastore.Key(datastore.NetworkKeyPrefix)
|
||||
|
@ -191,8 +187,7 @@ func (c *controller) watchNetworks() error {
|
|||
lview := c.networks
|
||||
c.Unlock()
|
||||
for k, v := range lview {
|
||||
global, _ := v.isGlobalScoped()
|
||||
if global {
|
||||
if v.isGlobalScoped() {
|
||||
tmpview[k] = v
|
||||
}
|
||||
}
|
||||
|
@ -207,7 +202,7 @@ func (c *controller) watchNetworks() error {
|
|||
continue
|
||||
}
|
||||
tmp := network{}
|
||||
if err := c.store.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound {
|
||||
if err := c.globalStore.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound {
|
||||
continue
|
||||
}
|
||||
if err := existing.deleteNetwork(); err != nil {
|
||||
|
@ -221,12 +216,12 @@ func (c *controller) watchNetworks() error {
|
|||
}
|
||||
|
||||
func (n *network) watchEndpoints() error {
|
||||
if !n.ctrlr.validateDatastoreConfig() {
|
||||
if !n.ctrlr.validateGlobalStoreConfig() {
|
||||
return nil
|
||||
}
|
||||
|
||||
n.Lock()
|
||||
cs := n.ctrlr.store
|
||||
cs := n.ctrlr.globalStore
|
||||
tmp := endpoint{network: n}
|
||||
n.stopWatchCh = make(chan struct{})
|
||||
stopCh := n.stopWatchCh
|
||||
|
@ -251,28 +246,11 @@ func (n *network) watchEndpoints() error {
|
|||
lview := n.endpoints
|
||||
n.Unlock()
|
||||
for k, v := range lview {
|
||||
global, _ := v.network.isGlobalScoped()
|
||||
if global {
|
||||
if v.network.isGlobalScoped() {
|
||||
tmpview[k] = v
|
||||
}
|
||||
}
|
||||
for _, epe := range eps {
|
||||
var ep endpoint
|
||||
err := json.Unmarshal(epe.Value, &ep)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
delete(tmpview, ep.id)
|
||||
ep.SetIndex(epe.LastIndex)
|
||||
ep.network = n
|
||||
if n.ctrlr.processEndpointUpdate(&ep) {
|
||||
err = n.ctrlr.newEndpointFromStore(epe.Key, &ep)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
n.ctrlr.processEndpointsUpdate(eps, &tmpview)
|
||||
// Delete processing
|
||||
for k := range tmpview {
|
||||
n.Lock()
|
||||
|
@ -381,3 +359,67 @@ func ensureKeys(key string, cs datastore.DataStore) error {
|
|||
}
|
||||
return cs.KVStore().Put(key, []byte{}, nil)
|
||||
}
|
||||
|
||||
func (c *controller) getLocalStoreConfig(cfg *config.Config) *config.DatastoreCfg {
|
||||
if cfg != nil && cfg.LocalStore.Client.Provider != "" && cfg.LocalStore.Client.Address != "" {
|
||||
return &cfg.LocalStore
|
||||
}
|
||||
return &defaultLocalStoreConfig
|
||||
}
|
||||
|
||||
func (c *controller) getNetworksFromLocalStore() ([]*store.KVPair, error) {
|
||||
c.Lock()
|
||||
cs := c.localStore
|
||||
c.Unlock()
|
||||
return cs.KVStore().List(datastore.Key(datastore.NetworkKeyPrefix))
|
||||
}
|
||||
|
||||
func (c *controller) getDataStore(dataScope datastore.DataScope) (dataStore datastore.DataStore) {
|
||||
c.Lock()
|
||||
if dataScope == datastore.GlobalScope {
|
||||
dataStore = c.globalStore
|
||||
} else if dataScope == datastore.LocalScope {
|
||||
dataStore = c.localStore
|
||||
}
|
||||
c.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (c *controller) getEndpointsFromLocalStore() ([]*store.KVPair, error) {
|
||||
c.Lock()
|
||||
cs := c.localStore
|
||||
c.Unlock()
|
||||
return cs.KVStore().List(datastore.Key(datastore.EndpointKeyPrefix))
|
||||
}
|
||||
|
||||
func (c *controller) processEndpointsUpdate(eps []*store.KVPair, prune *endpointTable) {
|
||||
for _, epe := range eps {
|
||||
var ep endpoint
|
||||
err := json.Unmarshal(epe.Value, &ep)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
if prune != nil {
|
||||
delete(*prune, ep.id)
|
||||
}
|
||||
ep.SetIndex(epe.LastIndex)
|
||||
if nid, err := ep.networkIDFromKey(epe.Key); err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
} else {
|
||||
if n, err := c.NetworkByID(nid); err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
} else {
|
||||
ep.network = n.(*network)
|
||||
}
|
||||
}
|
||||
if c.processEndpointUpdate(&ep) {
|
||||
err = c.newEndpointFromStore(epe.Key, &ep)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue