diff --git a/libnetwork/bitseq/store.go b/libnetwork/bitseq/store.go index 553f2cdf4e..a6550f415e 100644 --- a/libnetwork/bitseq/store.go +++ b/libnetwork/bitseq/store.go @@ -70,6 +70,11 @@ func (h *Handle) Exists() bool { return h.dbExists } +// DataScope method returns the storage scope of the datastore +func (h *Handle) DataScope() datastore.DataScope { + return datastore.GlobalScope +} + func (h *Handle) watchForChanges() error { h.Lock() store := h.store diff --git a/libnetwork/cmd/dnet/dnet.go b/libnetwork/cmd/dnet/dnet.go index 1d7c33150c..ee114d80f6 100644 --- a/libnetwork/cmd/dnet/dnet.go +++ b/libnetwork/cmd/dnet/dnet.go @@ -83,11 +83,11 @@ func processConfig(cfg *config.Config) []config.Option { if cfg.Daemon.Labels != nil { options = append(options, config.OptionLabels(cfg.Daemon.Labels)) } - if strings.TrimSpace(cfg.Datastore.Client.Provider) != "" { - options = append(options, config.OptionKVProvider(cfg.Datastore.Client.Provider)) + if strings.TrimSpace(cfg.GlobalStore.Client.Provider) != "" { + options = append(options, config.OptionKVProvider(cfg.GlobalStore.Client.Provider)) } - if strings.TrimSpace(cfg.Datastore.Client.Address) != "" { - options = append(options, config.OptionKVProviderURL(cfg.Datastore.Client.Address)) + if strings.TrimSpace(cfg.GlobalStore.Client.Address) != "" { + options = append(options, config.OptionKVProviderURL(cfg.GlobalStore.Client.Address)) } return options } diff --git a/libnetwork/config/config.go b/libnetwork/config/config.go index 504ce21f5f..3163800174 100644 --- a/libnetwork/config/config.go +++ b/libnetwork/config/config.go @@ -5,14 +5,15 @@ import ( "github.com/BurntSushi/toml" log "github.com/Sirupsen/logrus" + "github.com/docker/libkv/store" "github.com/docker/libnetwork/netlabel" ) // Config encapsulates configurations of various Libnetwork components type Config struct { - Daemon DaemonCfg - Cluster ClusterCfg - Datastore DatastoreCfg + Daemon DaemonCfg + Cluster ClusterCfg + GlobalStore, LocalStore DatastoreCfg } // DaemonCfg represents libnetwork core configuration @@ -41,6 +42,7 @@ type DatastoreCfg struct { type DatastoreClientCfg struct { Provider string Address string + Config *store.Config } // ParseConfig parses the libnetwork configuration file @@ -94,7 +96,7 @@ func OptionLabels(labels []string) Option { func OptionKVProvider(provider string) Option { return func(c *Config) { log.Infof("Option OptionKVProvider: %s", provider) - c.Datastore.Client.Provider = strings.TrimSpace(provider) + c.GlobalStore.Client.Provider = strings.TrimSpace(provider) } } @@ -102,7 +104,7 @@ func OptionKVProvider(provider string) Option { func OptionKVProviderURL(url string) Option { return func(c *Config) { log.Infof("Option OptionKVProviderURL: %s", url) - c.Datastore.Client.Address = strings.TrimSpace(url) + c.GlobalStore.Client.Address = strings.TrimSpace(url) } } @@ -122,3 +124,27 @@ func IsValidName(name string) bool { } return true } + +// OptionLocalKVProvider function returns an option setter for kvstore provider +func OptionLocalKVProvider(provider string) Option { + return func(c *Config) { + log.Infof("Option OptionLocalKVProvider: %s", provider) + c.LocalStore.Client.Provider = strings.TrimSpace(provider) + } +} + +// OptionLocalKVProviderURL function returns an option setter for kvstore url +func OptionLocalKVProviderURL(url string) Option { + return func(c *Config) { + log.Infof("Option OptionLocalKVProviderURL: %s", url) + c.LocalStore.Client.Address = strings.TrimSpace(url) + } +} + +// OptionLocalKVProviderConfig function returns an option setter for kvstore config +func OptionLocalKVProviderConfig(config *store.Config) Option { + return func(c *Config) { + log.Infof("Option OptionLocalKVProviderConfig: %v", config) + c.LocalStore.Client.Config = config + } +} diff --git a/libnetwork/controller.go b/libnetwork/controller.go index ae27ff0a2c..ed05d23ad7 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -120,13 +120,13 @@ type endpointTable map[string]*endpoint type sandboxTable map[string]*sandbox type controller struct { - id string - networks networkTable - drivers driverTable - sandboxes sandboxTable - cfg *config.Config - store datastore.DataStore - extKeyListener net.Listener + id string + networks networkTable + drivers driverTable + sandboxes sandboxTable + cfg *config.Config + globalStore, localStore datastore.DataStore + extKeyListener net.Listener sync.Mutex } @@ -152,7 +152,7 @@ func New(cfgOptions ...config.Option) (NetworkController, error) { } if cfg != nil { - if err := c.initDataStore(); err != nil { + if err := c.initGlobalStore(); err != nil { // Failing to initalize datastore is a bad situation to be in. // But it cannot fail creating the Controller log.Debugf("Failed to Initialize Datastore due to %v. Operating in non-clustered mode", err) @@ -162,6 +162,9 @@ func New(cfgOptions ...config.Option) (NetworkController, error) { // But it cannot fail creating the Controller log.Debugf("Failed to Initialize Discovery : %v", err) } + if err := c.initLocalStore(); err != nil { + return nil, fmt.Errorf("Failed to Initialize LocalDatastore due to %v.", err) + } } if err := c.startExternalKeyListener(); err != nil { @@ -282,6 +285,7 @@ func (c *controller) addNetwork(n *network) error { n.Lock() n.svcRecords = svcMap{} n.driver = dd.driver + n.dataScope = dd.capability.DataScope d := n.driver n.Unlock() @@ -480,19 +484,6 @@ func (c *controller) loadDriver(networkType string) (*driverData, error) { return dd, nil } -func (c *controller) isDriverGlobalScoped(networkType string) (bool, error) { - c.Lock() - dd, ok := c.drivers[networkType] - c.Unlock() - if !ok { - return false, types.NotFoundErrorf("driver not found for %s", networkType) - } - if dd.capability.Scope == driverapi.GlobalScope { - return true, nil - } - return false, nil -} - func (c *controller) Stop() { c.stopExternalKeyListener() osl.GC() diff --git a/libnetwork/datastore/datastore.go b/libnetwork/datastore/datastore.go index f504860f0a..07a0df5c6b 100644 --- a/libnetwork/datastore/datastore.go +++ b/libnetwork/datastore/datastore.go @@ -6,6 +6,7 @@ import ( "github.com/docker/libkv" "github.com/docker/libkv/store" + "github.com/docker/libkv/store/boltdb" "github.com/docker/libkv/store/consul" "github.com/docker/libkv/store/etcd" "github.com/docker/libkv/store/zookeeper" @@ -58,8 +59,20 @@ type KV interface { // True if the object exists in the datastore, false if it hasn't been stored yet. // When SetIndex() is called, the object has been stored. Exists() bool + // DataScope indicates the storage scope of the KV object + DataScope() DataScope } +// DataScope indicates the storage scope +type DataScope int + +const ( + // LocalScope indicates to store the KV object in local datastore such as boltdb + LocalScope DataScope = iota + // GlobalScope indicates to store the KV object in global datastore such as consul/etcd/zookeeper + GlobalScope +) + const ( // NetworkKeyPrefix is the prefix for network key in the kv store NetworkKeyPrefix = "network" @@ -73,6 +86,7 @@ func init() { consul.Register() zookeeper.Register() etcd.Register() + boltdb.Register() } //Key provides convenient method to create a Key @@ -94,8 +108,11 @@ func ParseKey(key string) ([]string, error) { } // newClient used to connect to KV Store -func newClient(kv string, addrs string) (DataStore, error) { - store, err := libkv.NewStore(store.Backend(kv), []string{addrs}, &store.Config{}) +func newClient(kv string, addrs string, config *store.Config) (DataStore, error) { + if config == nil { + config = &store.Config{} + } + store, err := libkv.NewStore(store.Backend(kv), []string{addrs}, config) if err != nil { return nil, err } @@ -109,7 +126,7 @@ func NewDataStore(cfg *config.DatastoreCfg) (DataStore, error) { return nil, types.BadRequestErrorf("invalid configuration passed to datastore") } // TODO : cfg.Embedded case - return newClient(cfg.Client.Provider, cfg.Client.Address) + return newClient(cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config) } // NewCustomDataStore can be used by clients to plugin cusom datatore that adhers to store.Store diff --git a/libnetwork/datastore/datastore_test.go b/libnetwork/datastore/datastore_test.go index b2e6572d1b..fe2befd5ab 100644 --- a/libnetwork/datastore/datastore_test.go +++ b/libnetwork/datastore/datastore_test.go @@ -162,6 +162,10 @@ func (n *dummyObject) Exists() bool { return n.DBExists } +func (n *dummyObject) DataScope() DataScope { + return LocalScope +} + func (n *dummyObject) MarshalJSON() ([]byte, error) { netMap := make(map[string]interface{}) netMap["name"] = n.Name diff --git a/libnetwork/driverapi/driverapi.go b/libnetwork/driverapi/driverapi.go index 694b0429f6..054442e753 100644 --- a/libnetwork/driverapi/driverapi.go +++ b/libnetwork/driverapi/driverapi.go @@ -1,6 +1,10 @@ package driverapi -import "net" +import ( + "net" + + "github.com/docker/libnetwork/datastore" +) // NetworkPluginEndpointType represents the Endpoint Type used by Plugin system const NetworkPluginEndpointType = "NetworkDriver" @@ -98,17 +102,7 @@ type DriverCallback interface { RegisterDriver(name string, driver Driver, capability Capability) error } -// Scope indicates the drivers scope capability -type Scope int - -const ( - // LocalScope represents the driver capable of providing networking services for containers in a single host - LocalScope Scope = iota - // GlobalScope represents the driver capable of providing networking services for containers across hosts - GlobalScope -) - // Capability represents the high level capabilities of the drivers which libnetwork can make use of type Capability struct { - Scope Scope + DataScope datastore.DataScope } diff --git a/libnetwork/drivers/bridge/bridge.go b/libnetwork/drivers/bridge/bridge.go index 346d8668b1..21e1f2b35d 100644 --- a/libnetwork/drivers/bridge/bridge.go +++ b/libnetwork/drivers/bridge/bridge.go @@ -14,6 +14,7 @@ import ( "syscall" "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/ipallocator" "github.com/docker/libnetwork/iptables" @@ -133,7 +134,7 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error { } c := driverapi.Capability{ - Scope: driverapi.LocalScope, + DataScope: datastore.LocalScope, } return dc.RegisterDriver(networkType, d, c) } diff --git a/libnetwork/drivers/host/host.go b/libnetwork/drivers/host/host.go index eac1af2055..747bdc62c9 100644 --- a/libnetwork/drivers/host/host.go +++ b/libnetwork/drivers/host/host.go @@ -3,6 +3,7 @@ package host import ( "sync" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/types" ) @@ -17,7 +18,7 @@ type driver struct { // Init registers a new instance of host driver func Init(dc driverapi.DriverCallback, config map[string]interface{}) error { c := driverapi.Capability{ - Scope: driverapi.LocalScope, + DataScope: datastore.LocalScope, } return dc.RegisterDriver(networkType, &driver{}, c) } diff --git a/libnetwork/drivers/null/null.go b/libnetwork/drivers/null/null.go index 4a0e92b958..6f472e78a4 100644 --- a/libnetwork/drivers/null/null.go +++ b/libnetwork/drivers/null/null.go @@ -3,6 +3,7 @@ package null import ( "sync" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/types" ) @@ -17,7 +18,7 @@ type driver struct { // Init registers a new instance of null driver func Init(dc driverapi.DriverCallback, config map[string]interface{}) error { c := driverapi.Capability{ - Scope: driverapi.LocalScope, + DataScope: datastore.LocalScope, } return dc.RegisterDriver(networkType, &driver{}, c) } diff --git a/libnetwork/drivers/overlay/ov_network.go b/libnetwork/drivers/overlay/ov_network.go index e55cc2e8ef..55d8edba50 100644 --- a/libnetwork/drivers/overlay/ov_network.go +++ b/libnetwork/drivers/overlay/ov_network.go @@ -306,6 +306,10 @@ func (n *network) SetValue(value []byte) error { return err } +func (n *network) DataScope() datastore.DataScope { + return datastore.GlobalScope +} + func (n *network) writeToStore() error { return n.driver.store.PutObjectAtomic(n) } diff --git a/libnetwork/drivers/overlay/overlay.go b/libnetwork/drivers/overlay/overlay.go index 606e723244..c831ec0e77 100644 --- a/libnetwork/drivers/overlay/overlay.go +++ b/libnetwork/drivers/overlay/overlay.go @@ -6,6 +6,7 @@ import ( "net" "sync" + "github.com/docker/libkv/store" "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" @@ -71,7 +72,7 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error { once.Do(onceInit) c := driverapi.Capability{ - Scope: driverapi.GlobalScope, + DataScope: datastore.GlobalScope, } d := &driver{ @@ -130,6 +131,10 @@ func (d *driver) configure(option map[string]interface{}) error { Address: provURL.(string), }, } + provConfig, confOk := option[netlabel.KVProviderConfig] + if confOk { + cfg.Client.Config = provConfig.(*store.Config) + } d.store, err = datastore.NewDataStore(cfg) if err != nil { err = fmt.Errorf("failed to initialize data store: %v", err) diff --git a/libnetwork/drivers/remote/driver.go b/libnetwork/drivers/remote/driver.go index 10865f2c81..5f5a0f5b25 100644 --- a/libnetwork/drivers/remote/driver.go +++ b/libnetwork/drivers/remote/driver.go @@ -6,6 +6,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/plugins" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/drivers/remote/api" "github.com/docker/libnetwork/types" @@ -52,9 +53,9 @@ func (d *driver) getCapabilities() (*driverapi.Capability, error) { c := &driverapi.Capability{} switch capResp.Scope { case "global": - c.Scope = driverapi.GlobalScope + c.DataScope = datastore.GlobalScope case "local": - c.Scope = driverapi.LocalScope + c.DataScope = datastore.LocalScope default: return nil, fmt.Errorf("invalid capability: expecting 'local' or 'global', got %s", capResp.Scope) } diff --git a/libnetwork/drivers/windows/windows.go b/libnetwork/drivers/windows/windows.go index efca69a99b..82fc61b7c0 100644 --- a/libnetwork/drivers/windows/windows.go +++ b/libnetwork/drivers/windows/windows.go @@ -1,6 +1,9 @@ package windows -import "github.com/docker/libnetwork/driverapi" +import ( + "github.com/docker/libnetwork/datastore" + "github.com/docker/libnetwork/driverapi" +) const networkType = "windows" @@ -11,7 +14,7 @@ type driver struct{} // Init registers a new instance of null driver func Init(dc driverapi.DriverCallback, config map[string]interface{}) error { c := driverapi.Capability{ - Scope: driverapi.LocalScope, + DataScope: datastore.LocalScope, } return dc.RegisterDriver(networkType, &driver{}, c) } diff --git a/libnetwork/endpoint.go b/libnetwork/endpoint.go index 6980a868e9..ab8645c5e7 100644 --- a/libnetwork/endpoint.go +++ b/libnetwork/endpoint.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net" + "strings" "sync" log "github.com/Sirupsen/logrus" @@ -130,15 +131,15 @@ func (ep *endpoint) KeyPrefix() []string { return []string{datastore.EndpointKeyPrefix, ep.getNetwork().id} } -func (ep *endpoint) networkIDFromKey(key []string) (string, error) { - // endpoint Key structure : endpoint/network-id/endpoint-id - // it's an invalid key if the key doesn't have all the 3 key elements above - if key == nil || len(key) < 3 || key[0] != datastore.EndpointKeyPrefix { +func (ep *endpoint) networkIDFromKey(key string) (string, error) { + // endpoint Key structure : docker/libnetwork/endpoint/${network-id}/${endpoint-id} + // it's an invalid key if the key doesn't have all the 5 key elements above + keyElements := strings.Split(key, "/") + if !strings.HasPrefix(key, datastore.Key(datastore.EndpointKeyPrefix)) || len(keyElements) < 5 { return "", fmt.Errorf("invalid endpoint key : %v", key) } - - // network-id is placed at index=1. pls refer to endpoint.Key() method - return key[1], nil + // network-id is placed at index=3. pls refer to endpoint.Key() method + return strings.Split(key, "/")[3], nil } func (ep *endpoint) Value() []byte { @@ -540,3 +541,9 @@ func JoinOptionPriority(ep Endpoint, prio int) EndpointOption { sb.epPriority[ep.id] = prio } } + +func (ep *endpoint) DataScope() datastore.DataScope { + ep.Lock() + defer ep.Unlock() + return ep.network.dataScope +} diff --git a/libnetwork/ipam/store.go b/libnetwork/ipam/store.go index 1e8d740609..2e204980be 100644 --- a/libnetwork/ipam/store.go +++ b/libnetwork/ipam/store.go @@ -176,3 +176,8 @@ func (a *Allocator) deleteFromStore() error { } return store.DeleteObjectAtomic(a) } + +// DataScope method returns the storage scope of the datastore +func (a *Allocator) DataScope() datastore.DataScope { + return datastore.GlobalScope +} diff --git a/libnetwork/libnetwork_internal_test.go b/libnetwork/libnetwork_internal_test.go index 317afbb7f3..bd93129cc6 100644 --- a/libnetwork/libnetwork_internal_test.go +++ b/libnetwork/libnetwork_internal_test.go @@ -28,5 +28,5 @@ func TestDriverRegistration(t *testing.T) { func SetTestDataStore(c NetworkController, custom datastore.DataStore) { con := c.(*controller) - con.store = custom + con.globalStore = custom } diff --git a/libnetwork/netlabel/labels.go b/libnetwork/netlabel/labels.go index 42779be8a4..d2db2403ed 100644 --- a/libnetwork/netlabel/labels.go +++ b/libnetwork/netlabel/labels.go @@ -30,6 +30,9 @@ const ( // KVProviderURL constant represents the KV provider URL KVProviderURL = DriverPrefix + ".kv_provider_url" + // KVProviderConfig constant represents the KV provider Config + KVProviderConfig = DriverPrefix + ".kv_provider_config" + // OverlayBindInterface constant represents overlay driver bind interface OverlayBindInterface = DriverPrefix + ".overlay.bind_interface" diff --git a/libnetwork/network.go b/libnetwork/network.go index e667e2f46d..a677566f33 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -69,6 +69,7 @@ type network struct { svcRecords svcMap dbExists bool stopWatchCh chan struct{} + dataScope datastore.DataScope sync.Mutex } @@ -140,6 +141,12 @@ func (n *network) Exists() bool { return n.dbExists } +func (n *network) DataScope() datastore.DataScope { + n.Lock() + defer n.Unlock() + return n.dataScope +} + func (n *network) EndpointCnt() uint64 { n.Lock() defer n.Unlock() @@ -398,11 +405,8 @@ func (n *network) EndpointByID(id string) (Endpoint, error) { return nil, ErrNoSuchEndpoint(id) } -func (n *network) isGlobalScoped() (bool, error) { - n.Lock() - c := n.ctrlr - n.Unlock() - return c.isDriverGlobalScoped(n.networkType) +func (n *network) isGlobalScoped() bool { + return n.DataScope() == datastore.GlobalScope } func (n *network) updateSvcRecord(ep *endpoint, isAdd bool) { diff --git a/libnetwork/store.go b/libnetwork/store.go index 42a14de332..b8c7d55f90 100644 --- a/libnetwork/store.go +++ b/libnetwork/store.go @@ -6,41 +6,82 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/libkv/store" + "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/datastore" ) -func (c *controller) validateDatastoreConfig() bool { - return c.cfg != nil && c.cfg.Datastore.Client.Provider != "" && c.cfg.Datastore.Client.Address != "" +var ( + defaultLocalStoreConfig = config.DatastoreCfg{ + Embedded: true, + Client: config.DatastoreClientCfg{ + Provider: "boltdb", + Address: defaultPrefix + "/boltdb.db", + Config: &store.Config{ + Bucket: "libnetwork", + }, + }, + } +) + +func (c *controller) validateGlobalStoreConfig() bool { + return c.cfg != nil && c.cfg.GlobalStore.Client.Provider != "" && c.cfg.GlobalStore.Client.Address != "" } -func (c *controller) initDataStore() error { +func (c *controller) initGlobalStore() error { c.Lock() cfg := c.cfg c.Unlock() - if !c.validateDatastoreConfig() { - return fmt.Errorf("datastore initialization requires a valid configuration") + if !c.validateGlobalStoreConfig() { + return fmt.Errorf("globalstore initialization requires a valid configuration") } - store, err := datastore.NewDataStore(&cfg.Datastore) + store, err := datastore.NewDataStore(&cfg.GlobalStore) if err != nil { return err } c.Lock() - c.store = store + c.globalStore = store c.Unlock() - nws, err := c.getNetworksFromStore() + nws, err := c.getNetworksFromGlobalStore() if err == nil { c.processNetworkUpdate(nws, nil) } else if err != datastore.ErrKeyNotFound { - log.Warnf("failed to read networks from datastore during init : %v", err) + log.Warnf("failed to read networks from globalstore during init : %v", err) } return c.watchNetworks() } -func (c *controller) getNetworksFromStore() ([]*store.KVPair, error) { +func (c *controller) initLocalStore() error { c.Lock() - cs := c.store + cfg := c.cfg + c.Unlock() + localStore, err := datastore.NewDataStore(c.getLocalStoreConfig(cfg)) + if err != nil { + return err + } + c.Lock() + c.localStore = localStore + c.Unlock() + + nws, err := c.getNetworksFromLocalStore() + if err == nil { + c.processNetworkUpdate(nws, nil) + } else if err != datastore.ErrKeyNotFound { + log.Warnf("failed to read networks from localstore during init : %v", err) + } + eps, err := c.getEndpointsFromLocalStore() + if err == nil { + c.processEndpointsUpdate(eps, nil) + } else if err != datastore.ErrKeyNotFound { + log.Warnf("failed to read endpoints from localstore during init : %v", err) + } + return nil +} + +func (c *controller) getNetworksFromGlobalStore() ([]*store.KVPair, error) { + c.Lock() + cs := c.globalStore c.Unlock() return cs.KVStore().List(datastore.Key(datastore.NetworkKeyPrefix)) } @@ -55,13 +96,7 @@ func (c *controller) newNetworkFromStore(n *network) error { } func (c *controller) updateNetworkToStore(n *network) error { - global, err := n.isGlobalScoped() - if err != nil || !global { - return err - } - c.Lock() - cs := c.store - c.Unlock() + cs := c.getDataStore(n.DataScope()) if cs == nil { log.Debugf("datastore not initialized. Network %s is not added to the store", n.Name()) return nil @@ -71,13 +106,7 @@ func (c *controller) updateNetworkToStore(n *network) error { } func (c *controller) deleteNetworkFromStore(n *network) error { - global, err := n.isGlobalScoped() - if err != nil || !global { - return err - } - c.Lock() - cs := c.store - c.Unlock() + cs := c.getDataStore(n.DataScope()) if cs == nil { log.Debugf("datastore not initialized. Network %s is not deleted from datastore", n.Name()) return nil @@ -90,14 +119,6 @@ func (c *controller) deleteNetworkFromStore(n *network) error { return nil } -func (c *controller) getNetworkFromStore(nid string) (*network, error) { - n := network{id: nid} - if err := c.store.GetObject(datastore.Key(n.Key()...), &n); err != nil { - return nil, err - } - return &n, nil -} - func (c *controller) newEndpointFromStore(key string, ep *endpoint) error { ep.Lock() n := ep.network @@ -115,16 +136,9 @@ func (c *controller) newEndpointFromStore(key string, ep *endpoint) error { func (c *controller) updateEndpointToStore(ep *endpoint) error { ep.Lock() - n := ep.network name := ep.name ep.Unlock() - global, err := n.isGlobalScoped() - if err != nil || !global { - return err - } - c.Lock() - cs := c.store - c.Unlock() + cs := c.getDataStore(ep.DataScope()) if cs == nil { log.Debugf("datastore not initialized. endpoint %s is not added to the store", name) return nil @@ -133,26 +147,8 @@ func (c *controller) updateEndpointToStore(ep *endpoint) error { return cs.PutObjectAtomic(ep) } -func (c *controller) getEndpointFromStore(eid string) (*endpoint, error) { - ep := endpoint{id: eid} - if err := c.store.GetObject(datastore.Key(ep.Key()...), &ep); err != nil { - return nil, err - } - return &ep, nil -} - func (c *controller) deleteEndpointFromStore(ep *endpoint) error { - ep.Lock() - n := ep.network - ep.Unlock() - global, err := n.isGlobalScoped() - if err != nil || !global { - return err - } - - c.Lock() - cs := c.store - c.Unlock() + cs := c.getDataStore(ep.DataScope()) if cs == nil { log.Debugf("datastore not initialized. endpoint %s is not deleted from datastore", ep.Name()) return nil @@ -166,12 +162,12 @@ func (c *controller) deleteEndpointFromStore(ep *endpoint) error { } func (c *controller) watchNetworks() error { - if !c.validateDatastoreConfig() { + if !c.validateGlobalStoreConfig() { return nil } c.Lock() - cs := c.store + cs := c.globalStore c.Unlock() networkKey := datastore.Key(datastore.NetworkKeyPrefix) @@ -191,8 +187,7 @@ func (c *controller) watchNetworks() error { lview := c.networks c.Unlock() for k, v := range lview { - global, _ := v.isGlobalScoped() - if global { + if v.isGlobalScoped() { tmpview[k] = v } } @@ -207,7 +202,7 @@ func (c *controller) watchNetworks() error { continue } tmp := network{} - if err := c.store.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound { + if err := c.globalStore.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound { continue } if err := existing.deleteNetwork(); err != nil { @@ -221,12 +216,12 @@ func (c *controller) watchNetworks() error { } func (n *network) watchEndpoints() error { - if !n.ctrlr.validateDatastoreConfig() { + if !n.ctrlr.validateGlobalStoreConfig() { return nil } n.Lock() - cs := n.ctrlr.store + cs := n.ctrlr.globalStore tmp := endpoint{network: n} n.stopWatchCh = make(chan struct{}) stopCh := n.stopWatchCh @@ -251,28 +246,11 @@ func (n *network) watchEndpoints() error { lview := n.endpoints n.Unlock() for k, v := range lview { - global, _ := v.network.isGlobalScoped() - if global { + if v.network.isGlobalScoped() { tmpview[k] = v } } - for _, epe := range eps { - var ep endpoint - err := json.Unmarshal(epe.Value, &ep) - if err != nil { - log.Error(err) - continue - } - delete(tmpview, ep.id) - ep.SetIndex(epe.LastIndex) - ep.network = n - if n.ctrlr.processEndpointUpdate(&ep) { - err = n.ctrlr.newEndpointFromStore(epe.Key, &ep) - if err != nil { - log.Error(err) - } - } - } + n.ctrlr.processEndpointsUpdate(eps, &tmpview) // Delete processing for k := range tmpview { n.Lock() @@ -381,3 +359,67 @@ func ensureKeys(key string, cs datastore.DataStore) error { } return cs.KVStore().Put(key, []byte{}, nil) } + +func (c *controller) getLocalStoreConfig(cfg *config.Config) *config.DatastoreCfg { + if cfg != nil && cfg.LocalStore.Client.Provider != "" && cfg.LocalStore.Client.Address != "" { + return &cfg.LocalStore + } + return &defaultLocalStoreConfig +} + +func (c *controller) getNetworksFromLocalStore() ([]*store.KVPair, error) { + c.Lock() + cs := c.localStore + c.Unlock() + return cs.KVStore().List(datastore.Key(datastore.NetworkKeyPrefix)) +} + +func (c *controller) getDataStore(dataScope datastore.DataScope) (dataStore datastore.DataStore) { + c.Lock() + if dataScope == datastore.GlobalScope { + dataStore = c.globalStore + } else if dataScope == datastore.LocalScope { + dataStore = c.localStore + } + c.Unlock() + return +} + +func (c *controller) getEndpointsFromLocalStore() ([]*store.KVPair, error) { + c.Lock() + cs := c.localStore + c.Unlock() + return cs.KVStore().List(datastore.Key(datastore.EndpointKeyPrefix)) +} + +func (c *controller) processEndpointsUpdate(eps []*store.KVPair, prune *endpointTable) { + for _, epe := range eps { + var ep endpoint + err := json.Unmarshal(epe.Value, &ep) + if err != nil { + log.Error(err) + continue + } + if prune != nil { + delete(*prune, ep.id) + } + ep.SetIndex(epe.LastIndex) + if nid, err := ep.networkIDFromKey(epe.Key); err != nil { + log.Error(err) + continue + } else { + if n, err := c.NetworkByID(nid); err != nil { + log.Error(err) + continue + } else { + ep.network = n.(*network) + } + } + if c.processEndpointUpdate(&ep) { + err = c.newEndpointFromStore(epe.Key, &ep) + if err != nil { + log.Error(err) + } + } + } +}