From d74384b1d4b040a7b9d38c3604d60c713daa9179 Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Mon, 5 Oct 2015 04:18:10 -0700 Subject: [PATCH 1/4] Add local store caching support Add local scope store caching support as well as do some refactoring to make it datastore scope aware and manage scope specific config. Signed-off-by: Jana Radhakrishnan --- libnetwork/datastore/cache.go | 153 +++++++++++ libnetwork/datastore/datastore.go | 344 ++++++++++++++++++++++--- libnetwork/datastore/datastore_test.go | 9 +- 3 files changed, 461 insertions(+), 45 deletions(-) create mode 100644 libnetwork/datastore/cache.go diff --git a/libnetwork/datastore/cache.go b/libnetwork/datastore/cache.go new file mode 100644 index 0000000000..08c8ac4839 --- /dev/null +++ b/libnetwork/datastore/cache.go @@ -0,0 +1,153 @@ +package datastore + +import ( + "fmt" + "sync" + + "github.com/docker/libkv/store" + "github.com/docker/libkv/store/boltdb" +) + +type kvMap map[string]KVObject + +type cache struct { + sync.Mutex + kmm map[string]kvMap + ds *datastore +} + +func newCache(ds *datastore) *cache { + return &cache{kmm: make(map[string]kvMap), ds: ds} +} + +func (c *cache) kmap(kvObject KVObject) (kvMap, error) { + var err error + + c.Lock() + keyPrefix := Key(kvObject.KeyPrefix()...) + kmap, ok := c.kmm[keyPrefix] + c.Unlock() + + if ok { + return kmap, nil + } + + kmap = kvMap{} + + // Bail out right away if the kvObject does not implement KVConstructor + ctor, ok := kvObject.(KVConstructor) + if !ok { + return nil, fmt.Errorf("error while populating kmap, object does not implement KVConstructor interface") + } + + kvList, err := c.ds.store.List(keyPrefix) + if err != nil { + // In case of BoltDB it may return ErrBoltBucketNotFound when no writes + // have ever happened on the db bucket. So check for both err codes + if err == store.ErrKeyNotFound || err == boltdb.ErrBoltBucketNotFound { + // If the store doesn't have anything then there is nothing to + // populate in the cache. Just bail out. + goto out + } + + return nil, fmt.Errorf("error while populating kmap: %v", err) + } + + for _, kvPair := range kvList { + // Ignore empty kvPair values + if len(kvPair.Value) == 0 { + continue + } + + dstO := ctor.New() + err = dstO.SetValue(kvPair.Value) + if err != nil { + return nil, err + } + + // Make sure the object has a correct view of the DB index in + // case we need to modify it and update the DB. + dstO.SetIndex(kvPair.LastIndex) + + kmap[Key(dstO.Key()...)] = dstO + } + +out: + // There may multiple go routines racing to fill the + // cache. The one which places the kmap in c.kmm first + // wins. The others should just use what the first populated. + c.Lock() + kmapNew, ok := c.kmm[keyPrefix] + if ok { + c.Unlock() + return kmapNew, nil + } + + c.kmm[keyPrefix] = kmap + c.Unlock() + + return kmap, nil +} + +func (c *cache) add(kvObject KVObject) error { + kmap, err := c.kmap(kvObject) + if err != nil { + return err + } + + c.Lock() + kmap[Key(kvObject.Key()...)] = kvObject + c.Unlock() + return nil +} + +func (c *cache) del(kvObject KVObject) error { + kmap, err := c.kmap(kvObject) + if err != nil { + return err + } + + c.Lock() + delete(kmap, Key(kvObject.Key()...)) + c.Unlock() + return nil +} + +func (c *cache) get(key string, kvObject KVObject) error { + kmap, err := c.kmap(kvObject) + if err != nil { + return err + } + + c.Lock() + defer c.Unlock() + + o, ok := kmap[Key(kvObject.Key()...)] + if !ok { + return ErrKeyNotFound + } + + ctor, ok := o.(KVConstructor) + if !ok { + return fmt.Errorf("kvobject does not implement KVConstructor interface. could not get object") + } + + return ctor.CopyTo(kvObject) +} + +func (c *cache) list(kvObject KVObject) ([]KVObject, error) { + kmap, err := c.kmap(kvObject) + if err != nil { + return nil, err + } + + c.Lock() + defer c.Unlock() + + var kvol []KVObject + for _, v := range kmap { + kvol = append(kvol, v) + } + + return kvol, nil +} diff --git a/libnetwork/datastore/datastore.go b/libnetwork/datastore/datastore.go index 927b67d23e..a0edd76a83 100644 --- a/libnetwork/datastore/datastore.go +++ b/libnetwork/datastore/datastore.go @@ -1,8 +1,11 @@ package datastore import ( + "fmt" + "log" "reflect" "strings" + "time" "github.com/docker/libkv" "github.com/docker/libkv/store" @@ -10,26 +13,37 @@ import ( "github.com/docker/libkv/store/consul" "github.com/docker/libkv/store/etcd" "github.com/docker/libkv/store/zookeeper" - "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/types" ) //DataStore exported type DataStore interface { // GetObject gets data from datastore and unmarshals to the specified object - GetObject(key string, o KV) error + GetObject(key string, o KVObject) error // PutObject adds a new Record based on an object into the datastore - PutObject(kvObject KV) error + PutObject(kvObject KVObject) error // PutObjectAtomic provides an atomic add and update operation for a Record - PutObjectAtomic(kvObject KV) error + PutObjectAtomic(kvObject KVObject) error // DeleteObject deletes a record - DeleteObject(kvObject KV) error + DeleteObject(kvObject KVObject) error // DeleteObjectAtomic performs an atomic delete operation - DeleteObjectAtomic(kvObject KV) error + DeleteObjectAtomic(kvObject KVObject) error // DeleteTree deletes a record - DeleteTree(kvObject KV) error + DeleteTree(kvObject KVObject) error + // Watchable returns whether the store is watchable are not + Watchable() bool + // Watch for changes on a KVObject + Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error) + // List returns of a list of KVObjects belonging to the parent + // key. The caller must pass a KVObject of the same type as + // the objects that need to be listed + List(string, KVObject) ([]KVObject, error) + // Scope returns the scope of the store + Scope() string // KVStore returns access to the KV Store KVStore() store.Store + // Close closes the data store + Close() } // ErrKeyModified is raised for an atomic update when the update is working on a stale state @@ -39,11 +53,13 @@ var ( ) type datastore struct { + scope string store store.Store + cache *cache } -//KV Key Value interface used by objects to be part of the DataStore -type KV interface { +// KVObject is Key/Value interface used by objects to be part of the DataStore +type KVObject interface { // Key method lets an object to provide the Key to be used in KV Store Key() []string // KeyPrefix method lets an object to return immediate parent key that can be used for tree walk @@ -60,19 +76,40 @@ type KV interface { // When SetIndex() is called, the object has been stored. Exists() bool // DataScope indicates the storage scope of the KV object - DataScope() DataScope + DataScope() string // Skip provides a way for a KV Object to avoid persisting it in the KV Store Skip() bool } -// DataScope indicates the storage scope -type DataScope int +// KVConstructor interface defines methods which can construct a KVObject from another. +type KVConstructor interface { + // New returns a new object which is created based on the + // source object + New() KVObject + // CopyTo deep copies the contents of the implementing object + // to the passed destination object + CopyTo(KVObject) error +} + +// ScopeCfg represents Datastore configuration. +type ScopeCfg struct { + Embedded bool + Client ScopeClientCfg +} + +// ScopeClientCfg represents Datastore Client-only mode configuration +type ScopeClientCfg struct { + Provider string + Address string + Config *store.Config +} const ( // LocalScope indicates to store the KV object in local datastore such as boltdb - LocalScope DataScope = iota + LocalScope = "local" // GlobalScope indicates to store the KV object in global datastore such as consul/etcd/zookeeper - GlobalScope + GlobalScope = "global" + defaultPrefix = "/var/lib/docker/network/files" ) const ( @@ -82,6 +119,27 @@ const ( EndpointKeyPrefix = "endpoint" ) +var ( + defaultScopes = makeDefaultScopes() +) + +func makeDefaultScopes() map[string]*ScopeCfg { + def := make(map[string]*ScopeCfg) + def[LocalScope] = &ScopeCfg{ + Embedded: true, + Client: ScopeClientCfg{ + Provider: "boltdb", + Address: defaultPrefix + "/boltdb.db", + Config: &store.Config{ + Bucket: "libnetwork", + ConnectionTimeout: 3 * time.Second, + }, + }, + } + + return def +} + var rootChain = []string{"docker", "libnetwork"} func init() { @@ -91,6 +149,28 @@ func init() { boltdb.Register() } +// DefaultScopes returns a map of default scopes and it's config for clients to use. +func DefaultScopes(dataDir string) map[string]*ScopeCfg { + if dataDir != "" { + defaultScopes[LocalScope].Client.Address = dataDir + "/network/files/boltdb.db" + return defaultScopes + } + + defaultScopes[LocalScope].Client.Address = defaultPrefix + "/boltdb.db" + return defaultScopes +} + +// IsValid checks if the scope config has valid configuration. +func (cfg *ScopeCfg) IsValid() bool { + if cfg == nil || + strings.TrimSpace(cfg.Client.Provider) == "" || + strings.TrimSpace(cfg.Client.Address) == "" { + return false + } + + return true +} + //Key provides convenient method to create a Key func Key(key ...string) string { keychain := append(rootChain, key...) @@ -110,7 +190,11 @@ func ParseKey(key string) ([]string, error) { } // newClient used to connect to KV Store -func newClient(kv string, addrs string, config *store.Config) (DataStore, error) { +func newClient(scope string, kv string, addrs string, config *store.Config, cached bool) (DataStore, error) { + if cached && scope != LocalScope { + return nil, fmt.Errorf("caching supported only for scope %s", LocalScope) + } + if config == nil { config = &store.Config{} } @@ -118,22 +202,82 @@ func newClient(kv string, addrs string, config *store.Config) (DataStore, error) if err != nil { return nil, err } - ds := &datastore{store: store} + + ds := &datastore{scope: scope, store: store} + if cached { + ds.cache = newCache(ds) + } + return ds, nil } // NewDataStore creates a new instance of LibKV data store -func NewDataStore(cfg *config.DatastoreCfg) (DataStore, error) { - if cfg == nil { - return nil, types.BadRequestErrorf("invalid configuration passed to datastore") +func NewDataStore(scope string, cfg *ScopeCfg) (DataStore, error) { + if cfg == nil || cfg.Client.Provider == "" || cfg.Client.Address == "" { + c, ok := defaultScopes[scope] + if !ok || c.Client.Provider == "" || c.Client.Address == "" { + return nil, fmt.Errorf("unexpected scope %s without configuration passed", scope) + } + + cfg = c } - // TODO : cfg.Embedded case - return newClient(cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config) + + var cached bool + if scope == LocalScope { + cached = true + } + + return newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, cached) } -// NewCustomDataStore can be used by clients to plugin cusom datatore that adhers to store.Store -func NewCustomDataStore(customStore store.Store) DataStore { - return &datastore{store: customStore} +func (ds *datastore) Close() { + ds.store.Close() +} + +func (ds *datastore) Scope() string { + return ds.scope +} + +func (ds *datastore) Watchable() bool { + return ds.scope != LocalScope +} + +func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error) { + sCh := make(chan struct{}) + + ctor, ok := kvObject.(KVConstructor) + if !ok { + return nil, fmt.Errorf("error watching object type %T, object does not implement KVConstructor interface", kvObject) + } + + kvpCh, err := ds.store.Watch(Key(kvObject.Key()...), sCh) + if err != nil { + return nil, err + } + + kvoCh := make(chan KVObject) + + go func() { + for { + select { + case <-stopCh: + close(sCh) + return + case kvPair := <-kvpCh: + dstO := ctor.New() + + if err := dstO.SetValue(kvPair.Value); err != nil { + log.Printf("Could not unmarshal kvpair value = %s", string(kvPair.Value)) + break + } + + dstO.SetIndex(kvPair.LastIndex) + kvoCh <- dstO + } + } + }() + + return kvoCh, nil } func (ds *datastore) KVStore() store.Store { @@ -141,40 +285,71 @@ func (ds *datastore) KVStore() store.Store { } // PutObjectAtomic adds a new Record based on an object into the datastore -func (ds *datastore) PutObjectAtomic(kvObject KV) error { +func (ds *datastore) PutObjectAtomic(kvObject KVObject) error { + var ( + previous *store.KVPair + pair *store.KVPair + err error + ) + if kvObject == nil { return types.BadRequestErrorf("invalid KV Object : nil") } + kvObjValue := kvObject.Value() if kvObjValue == nil { return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...)) } - var previous *store.KVPair + if kvObject.Skip() { + goto add_cache + } + if kvObject.Exists() { previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()} } else { previous = nil } - _, pair, err := ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil) + + _, pair, err = ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil) if err != nil { return err } kvObject.SetIndex(pair.LastIndex) + +add_cache: + if ds.cache != nil { + return ds.cache.add(kvObject) + } + return nil } // PutObject adds a new Record based on an object into the datastore -func (ds *datastore) PutObject(kvObject KV) error { +func (ds *datastore) PutObject(kvObject KVObject) error { if kvObject == nil { return types.BadRequestErrorf("invalid KV Object : nil") } - return ds.putObjectWithKey(kvObject, kvObject.Key()...) + + if kvObject.Skip() { + goto add_cache + } + + if err := ds.putObjectWithKey(kvObject, kvObject.Key()...); err != nil { + return err + } + +add_cache: + if ds.cache != nil { + return ds.cache.add(kvObject) + } + + return nil } -func (ds *datastore) putObjectWithKey(kvObject KV, key ...string) error { +func (ds *datastore) putObjectWithKey(kvObject KVObject, key ...string) error { kvObjValue := kvObject.Value() if kvObjValue == nil { @@ -184,39 +359,128 @@ func (ds *datastore) putObjectWithKey(kvObject KV, key ...string) error { } // GetObject returns a record matching the key -func (ds *datastore) GetObject(key string, o KV) error { +func (ds *datastore) GetObject(key string, o KVObject) error { + if ds.cache != nil { + return ds.cache.get(key, o) + } + kvPair, err := ds.store.Get(key) if err != nil { return err } - err = o.SetValue(kvPair.Value) - if err != nil { + + if err := o.SetValue(kvPair.Value); err != nil { return err } - // Make sure the object has a correct view of the DB index in case we need to modify it - // and update the DB. + // Make sure the object has a correct view of the DB index in + // case we need to modify it and update the DB. o.SetIndex(kvPair.LastIndex) return nil } +func (ds *datastore) ensureKey(key string) error { + exists, err := ds.store.Exists(key) + if err != nil { + return err + } + if exists { + return nil + } + return ds.store.Put(key, []byte{}, nil) +} + +func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) { + if ds.cache != nil { + return ds.cache.list(kvObject) + } + + // Bail out right away if the kvObject does not implement KVConstructor + ctor, ok := kvObject.(KVConstructor) + if !ok { + return nil, fmt.Errorf("error listing objects, object does not implement KVConstructor interface") + } + + // Make sure the parent key exists + if err := ds.ensureKey(key); err != nil { + return nil, err + } + + kvList, err := ds.store.List(key) + if err != nil { + return nil, err + } + + var kvol []KVObject + for _, kvPair := range kvList { + if len(kvPair.Value) == 0 { + continue + } + + dstO := ctor.New() + if err := dstO.SetValue(kvPair.Value); err != nil { + return nil, err + } + + // Make sure the object has a correct view of the DB index in + // case we need to modify it and update the DB. + dstO.SetIndex(kvPair.LastIndex) + + kvol = append(kvol, dstO) + } + + return kvol, nil +} + // DeleteObject unconditionally deletes a record from the store -func (ds *datastore) DeleteObject(kvObject KV) error { +func (ds *datastore) DeleteObject(kvObject KVObject) error { + // cleaup the cache first + if ds.cache != nil { + ds.cache.del(kvObject) + } + + if kvObject.Skip() { + return nil + } + return ds.store.Delete(Key(kvObject.Key()...)) } // DeleteObjectAtomic performs atomic delete on a record -func (ds *datastore) DeleteObjectAtomic(kvObject KV) error { +func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error { if kvObject == nil { return types.BadRequestErrorf("invalid KV Object : nil") } previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()} - _, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous) - return err + + if kvObject.Skip() { + goto del_cache + } + + if _, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil { + return err + } + +del_cache: + // cleanup the cache only if AtomicDelete went through successfully + if ds.cache != nil { + return ds.cache.del(kvObject) + } + + return nil } // DeleteTree unconditionally deletes a record from the store -func (ds *datastore) DeleteTree(kvObject KV) error { +func (ds *datastore) DeleteTree(kvObject KVObject) error { + // cleaup the cache first + if ds.cache != nil { + ds.cache.del(kvObject) + } + + if kvObject.Skip() { + return nil + } + return ds.store.DeleteTree(Key(kvObject.KeyPrefix()...)) } diff --git a/libnetwork/datastore/datastore_test.go b/libnetwork/datastore/datastore_test.go index 59290cbbe7..4e93abdefc 100644 --- a/libnetwork/datastore/datastore_test.go +++ b/libnetwork/datastore/datastore_test.go @@ -5,7 +5,6 @@ import ( "reflect" "testing" - "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/options" _ "github.com/docker/libnetwork/testutils" "github.com/stretchr/testify/assert" @@ -15,7 +14,7 @@ var dummyKey = "dummy" // NewCustomDataStore can be used by other Tests in order to use custom datastore func NewTestDataStore() DataStore { - return &datastore{store: NewMockStore()} + return &datastore{scope: LocalScope, store: NewMockStore()} } func TestKey(t *testing.T) { @@ -38,11 +37,11 @@ func TestParseKey(t *testing.T) { } func TestInvalidDataStore(t *testing.T) { - config := &config.DatastoreCfg{} + config := &ScopeCfg{} config.Embedded = false config.Client.Provider = "invalid" config.Client.Address = "localhost:8500" - _, err := NewDataStore(config) + _, err := NewDataStore(GlobalScope, config) if err == nil { t.Fatal("Invalid Datastore connection configuration must result in a failure") } @@ -167,7 +166,7 @@ func (n *dummyObject) Skip() bool { return n.SkipSave } -func (n *dummyObject) DataScope() DataScope { +func (n *dummyObject) DataScope() string { return LocalScope } From 71e14dd52a3e8f728d4b4997f2441fcc1dcde16c Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Mon, 5 Oct 2015 04:21:15 -0700 Subject: [PATCH 2/4] Remove always-on watch for networks and endpoints Always on watching of networks and endpoints can affect scalability of the cluster beyond a few nodes. Remove pro active watching and watch only the objects you are interested in. Signed-off-by: Jana Radhakrishnan --- libnetwork/api/api_test.go | 52 +- libnetwork/cmd/dnet/dnet.go | 30 +- libnetwork/config/config.go | 70 ++- libnetwork/controller.go | 167 +++-- libnetwork/driverapi/driverapi.go | 8 +- libnetwork/drivers.go | 7 +- libnetwork/drivers/overlay/ov_endpoint.go | 12 +- libnetwork/drivers/overlay/ov_network.go | 80 +-- libnetwork/drivers/overlay/overlay.go | 7 +- libnetwork/endpoint.go | 218 ++++--- libnetwork/endpoint_info.go | 53 +- libnetwork/libnetwork_internal_test.go | 6 - libnetwork/libnetwork_test.go | 36 +- libnetwork/network.go | 318 +++++++--- libnetwork/sandbox.go | 23 +- libnetwork/sandbox_test.go | 12 +- libnetwork/store.go | 706 ++++++++++------------ libnetwork/store_test.go | 16 +- 18 files changed, 1022 insertions(+), 799 deletions(-) diff --git a/libnetwork/api/api_test.go b/libnetwork/api/api_test.go index d85a15a185..d64ca008c6 100644 --- a/libnetwork/api/api_test.go +++ b/libnetwork/api/api_test.go @@ -14,6 +14,7 @@ import ( "github.com/docker/docker/pkg/reexec" "github.com/docker/libnetwork" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/netlabel" "github.com/docker/libnetwork/options" "github.com/docker/libnetwork/testutils" @@ -88,11 +89,13 @@ func i2sbL(i interface{}) []*sandboxResource { } func createTestNetwork(t *testing.T, network string) (libnetwork.NetworkController, libnetwork.Network) { + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) } - defer c.Stop() netOption := options.Generic{ netlabel.GenericData: options.Generic{ @@ -175,6 +178,9 @@ func TestJson(t *testing.T) { func TestCreateDeleteNetwork(t *testing.T) { defer testutils.SetupTestOSContext(t)() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -249,6 +255,9 @@ func TestCreateDeleteNetwork(t *testing.T) { func TestGetNetworksAndEndpoints(t *testing.T) { defer testutils.SetupTestOSContext(t)() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -518,6 +527,9 @@ func TestGetNetworksAndEndpoints(t *testing.T) { func TestProcGetServices(t *testing.T) { defer testutils.SetupTestOSContext(t)() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -686,6 +698,7 @@ func TestProcGetService(t *testing.T) { defer testutils.SetupTestOSContext(t)() c, nw := createTestNetwork(t, "network") + defer c.Stop() ep1, err := nw.CreateEndpoint("db") if err != nil { t.Fatal(err) @@ -738,6 +751,8 @@ func TestProcPublishUnpublishService(t *testing.T) { defer testutils.SetupTestOSContext(t)() c, _ := createTestNetwork(t, "network") + defer c.Stop() + vars := make(map[string]string) vbad, err := json.Marshal("bad service create data") @@ -870,6 +885,7 @@ func TestAttachDetachBackend(t *testing.T) { defer testutils.SetupTestOSContext(t)() c, nw := createTestNetwork(t, "network") + defer c.Stop() ep1, err := nw.CreateEndpoint("db") if err != nil { t.Fatal(err) @@ -994,6 +1010,9 @@ func TestAttachDetachBackend(t *testing.T) { } func TestDetectGetNetworksInvalidQueryComposition(t *testing.T) { + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -1011,6 +1030,7 @@ func TestDetectGetEndpointsInvalidQueryComposition(t *testing.T) { defer testutils.SetupTestOSContext(t)() c, _ := createTestNetwork(t, "network") + defer c.Stop() vars := map[string]string{urlNwName: "network", urlEpName: "x", urlEpPID: "y"} _, errRsp := procGetEndpoints(c, vars, nil) @@ -1023,6 +1043,7 @@ func TestDetectGetServicesInvalidQueryComposition(t *testing.T) { defer testutils.SetupTestOSContext(t)() c, _ := createTestNetwork(t, "network") + defer c.Stop() vars := map[string]string{urlNwName: "network", urlEpName: "x", urlEpPID: "y"} _, errRsp := procGetServices(c, vars, nil) @@ -1040,6 +1061,8 @@ func TestFindNetworkUtil(t *testing.T) { defer testutils.SetupTestOSContext(t)() c, nw := createTestNetwork(t, "network") + defer c.Stop() + nid := nw.ID() _, errRsp := findNetwork(c, "", byName) @@ -1102,6 +1125,9 @@ func TestFindNetworkUtil(t *testing.T) { func TestCreateDeleteEndpoints(t *testing.T) { defer testutils.SetupTestOSContext(t)() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -1225,6 +1251,9 @@ func TestCreateDeleteEndpoints(t *testing.T) { func TestJoinLeave(t *testing.T) { defer testutils.SetupTestOSContext(t)() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -1382,6 +1411,8 @@ func TestFindEndpointUtilPanic(t *testing.T) { defer testutils.SetupTestOSContext(t)() defer checkPanic(t) c, nw := createTestNetwork(t, "network") + defer c.Stop() + nid := nw.ID() findEndpoint(c, nid, "", byID, -1) } @@ -1390,6 +1421,8 @@ func TestFindServiceUtilPanic(t *testing.T) { defer testutils.SetupTestOSContext(t)() defer checkPanic(t) c, _ := createTestNetwork(t, "network") + defer c.Stop() + findService(c, "random_service", -1) } @@ -1397,6 +1430,8 @@ func TestFindEndpointUtil(t *testing.T) { defer testutils.SetupTestOSContext(t)() c, nw := createTestNetwork(t, "network") + defer c.Stop() + nid := nw.ID() ep, err := nw.CreateEndpoint("secondEp", nil) @@ -1443,7 +1478,8 @@ func TestFindEndpointUtil(t *testing.T) { t.Fatalf("Unexepected failure: %v", errRsp) } - if ep0 != ep1 || ep0 != ep2 || ep0 != ep3 || ep0 != ep4 || ep0 != ep5 { + if ep0.ID() != ep1.ID() || ep0.ID() != ep2.ID() || + ep0.ID() != ep3.ID() || ep0.ID() != ep4.ID() || ep0.ID() != ep5.ID() { t.Fatalf("Diffenrent queries returned different endpoints") } @@ -1665,6 +1701,9 @@ func TestwriteJSON(t *testing.T) { func TestHttpHandlerUninit(t *testing.T) { defer testutils.SetupTestOSContext(t)() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -1732,6 +1771,9 @@ func TestHttpHandlerBadBody(t *testing.T) { rsp := newWriter() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -1765,6 +1807,9 @@ func TestEndToEnd(t *testing.T) { rsp := newWriter() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -2213,6 +2258,9 @@ func TestEndToEndErrorMessage(t *testing.T) { rsp := newWriter() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) diff --git a/libnetwork/cmd/dnet/dnet.go b/libnetwork/cmd/dnet/dnet.go index 738081bb98..7bac7c7881 100644 --- a/libnetwork/cmd/dnet/dnet.go +++ b/libnetwork/cmd/dnet/dnet.go @@ -22,10 +22,12 @@ import ( "github.com/docker/docker/pkg/reexec" "github.com/Sirupsen/logrus" + psignal "github.com/docker/docker/pkg/signal" "github.com/docker/docker/pkg/term" "github.com/docker/libnetwork" "github.com/docker/libnetwork/api" "github.com/docker/libnetwork/config" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/netlabel" "github.com/docker/libnetwork/options" @@ -76,6 +78,7 @@ func processConfig(cfg *config.Config) []config.Option { if cfg == nil { return options } + dn := "bridge" if strings.TrimSpace(cfg.Daemon.DefaultNetwork) != "" { dn = cfg.Daemon.DefaultNetwork @@ -91,12 +94,12 @@ func processConfig(cfg *config.Config) []config.Option { if cfg.Daemon.Labels != nil { options = append(options, config.OptionLabels(cfg.Daemon.Labels)) } - if strings.TrimSpace(cfg.GlobalStore.Client.Provider) != "" { - options = append(options, config.OptionKVProvider(cfg.GlobalStore.Client.Provider)) - } - if strings.TrimSpace(cfg.GlobalStore.Client.Address) != "" { - options = append(options, config.OptionKVProviderURL(cfg.GlobalStore.Client.Address)) + + if dcfg, ok := cfg.Scopes[datastore.GlobalScope]; ok && dcfg.IsValid() { + options = append(options, config.OptionKVProvider(dcfg.Client.Provider)) + options = append(options, config.OptionKVProviderURL(dcfg.Client.Address)) } + dOptions, err := startDiscovery(&cfg.Cluster) if err != nil { logrus.Infof("Skipping discovery : %s", err.Error()) @@ -182,8 +185,9 @@ func createDefaultNetwork(c libnetwork.NetworkController) { genericOption[netlabel.GenericData] = map[string]interface{}{ "BridgeName": nw, } - networkOption := libnetwork.NetworkOptionGeneric(genericOption) - createOptions = append(createOptions, networkOption) + createOptions = append(createOptions, + libnetwork.NetworkOptionGeneric(genericOption), + libnetwork.NetworkOptionPersist(false)) } _, err := c.NewNetwork(d, nw, createOptions...) if err != nil { @@ -214,6 +218,7 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error { fmt.Println("Error starting dnetDaemon :", err) return err } + createDefaultNetwork(controller) httpHandler := api.NewHTTPHandler(controller) r := mux.NewRouter().StrictSlash(false) @@ -231,10 +236,21 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error { post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler) handleSignals(controller) + setupDumpStackTrap() return http.ListenAndServe(d.addr, r) } +func setupDumpStackTrap() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGUSR1) + go func() { + for range c { + psignal.DumpStacks() + } + }() +} + func handleSignals(controller libnetwork.NetworkController) { c := make(chan os.Signal, 1) signals := []os.Signal{os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT} diff --git a/libnetwork/config/config.go b/libnetwork/config/config.go index 96c8dab677..0121c06630 100644 --- a/libnetwork/config/config.go +++ b/libnetwork/config/config.go @@ -7,19 +7,21 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/discovery" "github.com/docker/libkv/store" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/netlabel" ) // Config encapsulates configurations of various Libnetwork components type Config struct { - Daemon DaemonCfg - Cluster ClusterCfg - GlobalStore, LocalStore DatastoreCfg + Daemon DaemonCfg + Cluster ClusterCfg + Scopes map[string]*datastore.ScopeCfg } // DaemonCfg represents libnetwork core configuration type DaemonCfg struct { Debug bool + DataDir string DefaultNetwork string DefaultDriver string Labels []string @@ -34,26 +36,28 @@ type ClusterCfg struct { Heartbeat uint64 } -// DatastoreCfg represents Datastore configuration. -type DatastoreCfg struct { - Embedded bool - Client DatastoreClientCfg -} - -// DatastoreClientCfg represents Datastore Client-only mode configuration -type DatastoreClientCfg struct { - Provider string - Address string - Config *store.Config +// LoadDefaultScopes loads default scope configs for scopes which +// doesn't have explicit user specified configs. +func (c *Config) LoadDefaultScopes(dataDir string) { + for k, v := range datastore.DefaultScopes(dataDir) { + if _, ok := c.Scopes[k]; !ok { + c.Scopes[k] = v + } + } } // ParseConfig parses the libnetwork configuration file func ParseConfig(tomlCfgFile string) (*Config, error) { - var cfg Config - if _, err := toml.DecodeFile(tomlCfgFile, &cfg); err != nil { + cfg := &Config{ + Scopes: map[string]*datastore.ScopeCfg{}, + } + + if _, err := toml.DecodeFile(tomlCfgFile, cfg); err != nil { return nil, err } - return &cfg, nil + + cfg.LoadDefaultScopes(cfg.Daemon.DataDir) + return cfg, nil } // Option is a option setter function type used to pass varios configurations @@ -98,7 +102,10 @@ func OptionLabels(labels []string) Option { func OptionKVProvider(provider string) Option { return func(c *Config) { log.Infof("Option OptionKVProvider: %s", provider) - c.GlobalStore.Client.Provider = strings.TrimSpace(provider) + if _, ok := c.Scopes[datastore.GlobalScope]; !ok { + c.Scopes[datastore.GlobalScope] = &datastore.ScopeCfg{} + } + c.Scopes[datastore.GlobalScope].Client.Provider = strings.TrimSpace(provider) } } @@ -106,7 +113,10 @@ func OptionKVProvider(provider string) Option { func OptionKVProviderURL(url string) Option { return func(c *Config) { log.Infof("Option OptionKVProviderURL: %s", url) - c.GlobalStore.Client.Address = strings.TrimSpace(url) + if _, ok := c.Scopes[datastore.GlobalScope]; !ok { + c.Scopes[datastore.GlobalScope] = &datastore.ScopeCfg{} + } + c.Scopes[datastore.GlobalScope].Client.Address = strings.TrimSpace(url) } } @@ -124,6 +134,13 @@ func OptionDiscoveryAddress(address string) Option { } } +// OptionDataDir function returns an option setter for data folder +func OptionDataDir(dataDir string) Option { + return func(c *Config) { + c.Daemon.DataDir = dataDir + } +} + // ProcessOptions processes options and stores it in config func (c *Config) ProcessOptions(options ...Option) { for _, opt := range options { @@ -145,7 +162,10 @@ func IsValidName(name string) bool { func OptionLocalKVProvider(provider string) Option { return func(c *Config) { log.Infof("Option OptionLocalKVProvider: %s", provider) - c.LocalStore.Client.Provider = strings.TrimSpace(provider) + if _, ok := c.Scopes[datastore.LocalScope]; !ok { + c.Scopes[datastore.LocalScope] = &datastore.ScopeCfg{} + } + c.Scopes[datastore.LocalScope].Client.Provider = strings.TrimSpace(provider) } } @@ -153,7 +173,10 @@ func OptionLocalKVProvider(provider string) Option { func OptionLocalKVProviderURL(url string) Option { return func(c *Config) { log.Infof("Option OptionLocalKVProviderURL: %s", url) - c.LocalStore.Client.Address = strings.TrimSpace(url) + if _, ok := c.Scopes[datastore.LocalScope]; !ok { + c.Scopes[datastore.LocalScope] = &datastore.ScopeCfg{} + } + c.Scopes[datastore.LocalScope].Client.Address = strings.TrimSpace(url) } } @@ -161,6 +184,9 @@ func OptionLocalKVProviderURL(url string) Option { func OptionLocalKVProviderConfig(config *store.Config) Option { return func(c *Config) { log.Infof("Option OptionLocalKVProviderConfig: %v", config) - c.LocalStore.Client.Config = config + if _, ok := c.Scopes[datastore.LocalScope]; !ok { + c.Scopes[datastore.LocalScope] = &datastore.ScopeCfg{} + } + c.Scopes[datastore.LocalScope].Client.Config = config } } diff --git a/libnetwork/controller.go b/libnetwork/controller.go index 6fdac5e698..dbe13a5490 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -124,73 +124,71 @@ type ipamData struct { } type driverTable map[string]*driverData + +//type networkTable map[string]*network +//type endpointTable map[string]*endpoint type ipamTable map[string]*ipamData -type networkTable map[string]*network -type endpointTable map[string]*endpoint type sandboxTable map[string]*sandbox type controller struct { - id string - networks networkTable - drivers driverTable - ipamDrivers ipamTable - sandboxes sandboxTable - cfg *config.Config - globalStore, localStore datastore.DataStore - discovery hostdiscovery.HostDiscovery - extKeyListener net.Listener + id string + //networks networkTable + drivers driverTable + ipamDrivers ipamTable + sandboxes sandboxTable + cfg *config.Config + stores []datastore.DataStore + discovery hostdiscovery.HostDiscovery + extKeyListener net.Listener + watchCh chan *endpoint + unWatchCh chan *endpoint + svcDb map[string]svcMap sync.Mutex } // New creates a new instance of network controller. func New(cfgOptions ...config.Option) (NetworkController, error) { var cfg *config.Config + cfg = &config.Config{ + Daemon: config.DaemonCfg{ + DriverCfg: make(map[string]interface{}), + }, + Scopes: make(map[string]*datastore.ScopeCfg), + } + if len(cfgOptions) > 0 { - cfg = &config.Config{ - Daemon: config.DaemonCfg{ - DriverCfg: make(map[string]interface{}), - }, - } cfg.ProcessOptions(cfgOptions...) } + cfg.LoadDefaultScopes(cfg.Daemon.DataDir) + c := &controller{ id: stringid.GenerateRandomID(), cfg: cfg, - networks: networkTable{}, sandboxes: sandboxTable{}, drivers: driverTable{}, - ipamDrivers: ipamTable{}} - if err := initDrivers(c); err != nil { + ipamDrivers: ipamTable{}, + svcDb: make(map[string]svcMap), + } + + if err := c.initStores(); err != nil { return nil, err } - if cfg != nil { - if err := c.initGlobalStore(); err != nil { - // Failing to initalize datastore is a bad situation to be in. - // But it cannot fail creating the Controller - log.Debugf("Failed to Initialize Datastore due to %v. Operating in non-clustered mode", err) - } - if err := c.initLocalStore(); err != nil { - log.Debugf("Failed to Initialize LocalDatastore due to %v.", err) - } - } - - if err := initIpams(c, c.localStore, c.globalStore); err != nil { - return nil, err - } - - if cfg != nil { - if err := c.restoreFromGlobalStore(); err != nil { - log.Debugf("Failed to restore from global Datastore due to %v", err) - } + if cfg != nil && cfg.Cluster.Watcher != nil { if err := c.initDiscovery(cfg.Cluster.Watcher); err != nil { // Failing to initalize discovery is a bad situation to be in. // But it cannot fail creating the Controller log.Debugf("Failed to Initialize Discovery : %v", err) } - if err := c.restoreFromLocalStore(); err != nil { - log.Debugf("Failed to restore from local Datastore due to %v", err) - } + } + + if err := initDrivers(c); err != nil { + return nil, err + } + + if err := initIpams(c, c.getStore(datastore.LocalScope), + c.getStore(datastore.GlobalScope)); err != nil { + return nil, err } if err := c.startExternalKeyListener(); err != nil { @@ -325,15 +323,6 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti if !config.IsValidName(name) { return nil, ErrInvalidName(name) } - // Check if a network already exists with the specified network name - c.Lock() - for _, n := range c.networks { - if n.name == name { - c.Unlock() - return nil, NetworkNameError(name) - } - } - c.Unlock() // Construct the network object network := &network{ @@ -342,13 +331,15 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti ipamType: ipamapi.DefaultIPAM, id: stringid.GenerateRandomID(), ctrlr: c, - endpoints: endpointTable{}, persist: true, + drvOnce: &sync.Once{}, } network.processOptions(options...) - if _, err := c.loadNetworkDriver(network); err != nil { + // Make sure we have a driver available for this network type + // before we allocate anything. + if _, err := network.driver(); err != nil { return nil, err } @@ -364,7 +355,16 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti } }() - if err = c.addNetwork(network); err != nil { + // addNetwork can be called for local scope network lazily when + // an endpoint is created after a restart and the network was + // created in previous life. Make sure you wrap around the driver + // notification of network creation in once call so that the driver + // invoked only once in case both the network and endpoint creation + // happens in the same lifetime. + network.drvOnce.Do(func() { + err = c.addNetwork(network) + }) + if err != nil { return nil, err } @@ -380,35 +380,28 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti } func (c *controller) addNetwork(n *network) error { - if _, err := c.loadNetworkDriver(n); err != nil { + d, err := n.driver() + if err != nil { return err } - n.Lock() - d := n.driver - n.Unlock() // Create the network if err := d.CreateNetwork(n.id, n.generic, n.getIPv4Data(), n.getIPv6Data()); err != nil { return err } - if n.isGlobalScoped() { - if err := n.watchEndpoints(); err != nil { - return err - } - } - c.Lock() - c.networks[n.id] = n - c.Unlock() return nil } func (c *controller) Networks() []Network { - c.Lock() - defer c.Unlock() + var list []Network - list := make([]Network, 0, len(c.networks)) - for _, n := range c.networks { + networks, err := c.getNetworksFromStore() + if err != nil { + log.Error(err) + } + + for _, n := range networks { list = append(list, n) } @@ -450,12 +443,13 @@ func (c *controller) NetworkByID(id string) (Network, error) { if id == "" { return nil, ErrInvalidID(id) } - c.Lock() - defer c.Unlock() - if n, ok := c.networks[id]; ok { - return n, nil + + n, err := c.getNetworkFromStore(id) + if err != nil { + return nil, ErrNoSuchNetwork(id) } - return nil, ErrNoSuchNetwork(id) + + return n, nil } // NewSandbox creates a new sandbox for the passed container id @@ -620,30 +614,7 @@ func (c *controller) getIpamDriver(name string) (ipamapi.Ipam, error) { } func (c *controller) Stop() { - if c.localStore != nil { - c.localStore.KVStore().Close() - } + c.closeStores() c.stopExternalKeyListener() osl.GC() } - -func (c *controller) loadNetworkDriver(n *network) (driverapi.Driver, error) { - // Check if a driver for the specified network type is available - c.Lock() - dd, ok := c.drivers[n.networkType] - c.Unlock() - if !ok { - var err error - dd, err = c.loadDriver(n.networkType) - if err != nil { - return nil, err - } - } - - n.Lock() - n.svcRecords = svcMap{} - n.driver = dd.driver - n.dataScope = dd.capability.DataScope - n.Unlock() - return dd.driver, nil -} diff --git a/libnetwork/driverapi/driverapi.go b/libnetwork/driverapi/driverapi.go index c81e2db34e..bd311d0035 100644 --- a/libnetwork/driverapi/driverapi.go +++ b/libnetwork/driverapi/driverapi.go @@ -1,10 +1,6 @@ package driverapi -import ( - "net" - - "github.com/docker/libnetwork/datastore" -) +import "net" // NetworkPluginEndpointType represents the Endpoint Type used by Plugin system const NetworkPluginEndpointType = "NetworkDriver" @@ -105,7 +101,7 @@ type DriverCallback interface { // Capability represents the high level capabilities of the drivers which libnetwork can make use of type Capability struct { - DataScope datastore.DataScope + DataScope string } // DiscoveryType represents the type of discovery element the DiscoverNew function is invoked on diff --git a/libnetwork/drivers.go b/libnetwork/drivers.go index b50d089b09..d87ec1f612 100644 --- a/libnetwork/drivers.go +++ b/libnetwork/drivers.go @@ -3,6 +3,7 @@ package libnetwork import ( "strings" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/ipamapi" builtinIpam "github.com/docker/libnetwork/ipams/builtin" @@ -32,9 +33,9 @@ func makeDriverConfig(c *controller, ntype string) map[string]interface{} { config := make(map[string]interface{}) - if c.validateGlobalStoreConfig() { - config[netlabel.KVProvider] = c.cfg.GlobalStore.Client.Provider - config[netlabel.KVProviderURL] = c.cfg.GlobalStore.Client.Address + if dcfg, ok := c.cfg.Scopes[datastore.GlobalScope]; ok && dcfg.IsValid() { + config[netlabel.KVProvider] = dcfg.Client.Provider + config[netlabel.KVProviderURL] = dcfg.Client.Address } for _, label := range c.cfg.Daemon.Labels { diff --git a/libnetwork/drivers/overlay/ov_endpoint.go b/libnetwork/drivers/overlay/ov_endpoint.go index e3c1b88691..7a861a97a3 100644 --- a/libnetwork/drivers/overlay/ov_endpoint.go +++ b/libnetwork/drivers/overlay/ov_endpoint.go @@ -43,12 +43,16 @@ func (d *driver) CreateEndpoint(nid, eid string, ifInfo driverapi.InterfaceInfo, return err } + // Since we perform lazy configuration make sure we try + // configuring the driver when we enter CreateEndpoint since + // CreateNetwork may not be called in every node. + if err := d.configure(); err != nil { + return err + } + n := d.network(nid) if n == nil { - n, err = d.createNetworkfromStore(nid) - if err != nil { - return fmt.Errorf("network id %q not found", nid) - } + return fmt.Errorf("network id %q not found", nid) } ep := &endpoint{ diff --git a/libnetwork/drivers/overlay/ov_network.go b/libnetwork/drivers/overlay/ov_network.go index cb73c9dabe..fa5acd297a 100644 --- a/libnetwork/drivers/overlay/ov_network.go +++ b/libnetwork/drivers/overlay/ov_network.go @@ -45,12 +45,13 @@ type network struct { } func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Data, ipV6Data []driverapi.IPAMData) error { - var err error if id == "" { return fmt.Errorf("invalid network id") } - if err = d.configure(); err != nil { + // Since we perform lazy configuration make sure we try + // configuring the driver when we enter CreateNetwork + if err := d.configure(); err != nil { return err } @@ -71,29 +72,16 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Dat n.subnets = append(n.subnets, s) } - for { - // If the datastore has the network object already - // there is no need to do a write. - err = d.store.GetObject(datastore.Key(n.Key()...), n) - if err == nil || err != datastore.ErrKeyNotFound { - break - } - - err = n.writeToStore() - if err == nil || err != datastore.ErrKeyModified { - break - } - } - - if err != nil { + if err := n.writeToStore(); err != nil { return fmt.Errorf("failed to update data store for network %v: %v", n.id, err) } + d.addNetwork(n) return nil } -func (d *driver) createNetworkfromStore(nid string) (*network, error) { +/* func (d *driver) createNetworkfromStore(nid string) (*network, error) { n := &network{ id: nid, driver: d, @@ -107,7 +95,7 @@ func (d *driver) createNetworkfromStore(nid string) (*network, error) { return nil, fmt.Errorf("unable to get network %q from data store, %v", nid, err) } return n, nil -} +}*/ func (d *driver) DeleteNetwork(nid string) error { if nid == "" { @@ -313,9 +301,34 @@ func (d *driver) deleteNetwork(nid string) { func (d *driver) network(nid string) *network { d.Lock() - defer d.Unlock() + networks := d.networks + d.Unlock() - return d.networks[nid] + n, ok := networks[nid] + if !ok { + n = d.getNetworkFromStore(nid) + if n != nil { + n.driver = d + n.endpoints = endpointTable{} + n.once = &sync.Once{} + networks[nid] = n + } + } + + return n +} + +func (d *driver) getNetworkFromStore(nid string) *network { + if d.store == nil { + return nil + } + + n := &network{id: nid} + if err := d.store.GetObject(datastore.Key(n.Key()...), n); err != nil { + return nil + } + + return n } func (n *network) sandbox() osl.Sandbox { @@ -408,30 +421,23 @@ func (n *network) SetValue(value []byte) error { subnetIP, _ := types.ParseCIDR(subnetIPstr) gwIP, _ := types.ParseCIDR(gwIPstr) - // If the network is being created by reading from the - // datastore subnets have to created. If the network - // already exists update only the subnets' vni field - if len(n.subnets) == 0 { - s := &subnet{ - subnetIP: subnetIP, - gwIP: gwIP, - vni: vni, - once: &sync.Once{}, - } - n.subnets = append(n.subnets, s) - return nil + s := &subnet{ + subnetIP: subnetIP, + gwIP: gwIP, + vni: vni, + once: &sync.Once{}, } + n.subnets = append(n.subnets, s) sNet := n.getMatchingSubnet(subnetIP) if sNet != nil { - if vni != 0 { - sNet.vni = vni - } + sNet.vni = vni } + return nil } -func (n *network) DataScope() datastore.DataScope { +func (n *network) DataScope() string { return datastore.GlobalScope } diff --git a/libnetwork/drivers/overlay/overlay.go b/libnetwork/drivers/overlay/overlay.go index 995b4f194c..86a79f3979 100644 --- a/libnetwork/drivers/overlay/overlay.go +++ b/libnetwork/drivers/overlay/overlay.go @@ -6,7 +6,6 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/libkv/store" - "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/idm" @@ -84,8 +83,8 @@ func (d *driver) configure() error { provURL, urlOk := d.config[netlabel.KVProviderURL] if provOk && urlOk { - cfg := &config.DatastoreCfg{ - Client: config.DatastoreClientCfg{ + cfg := &datastore.ScopeCfg{ + Client: datastore.ScopeClientCfg{ Provider: provider.(string), Address: provURL.(string), }, @@ -94,7 +93,7 @@ func (d *driver) configure() error { if confOk { cfg.Client.Config = provConfig.(*store.Config) } - d.store, err = datastore.NewDataStore(cfg) + d.store, err = datastore.NewDataStore(datastore.GlobalScope, cfg) if err != nil { err = fmt.Errorf("failed to initialize data store: %v", err) return diff --git a/libnetwork/endpoint.go b/libnetwork/endpoint.go index bc398f7a64..a76cb235e7 100644 --- a/libnetwork/endpoint.go +++ b/libnetwork/endpoint.go @@ -12,6 +12,7 @@ import ( "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/ipamapi" "github.com/docker/libnetwork/netlabel" + "github.com/docker/libnetwork/options" "github.com/docker/libnetwork/types" ) @@ -107,6 +108,37 @@ func (ep *endpoint) UnmarshalJSON(b []byte) (err error) { return nil } +func (ep *endpoint) New() datastore.KVObject { + return &endpoint{network: ep.getNetwork()} +} + +func (ep *endpoint) CopyTo(o datastore.KVObject) error { + ep.Lock() + defer ep.Unlock() + + dstEp := o.(*endpoint) + dstEp.name = ep.name + dstEp.id = ep.id + dstEp.sandboxID = ep.sandboxID + dstEp.dbIndex = ep.dbIndex + dstEp.dbExists = ep.dbExists + + if ep.iface != nil { + dstEp.iface = &endpointInterface{} + ep.iface.CopyTo(dstEp.iface) + } + + dstEp.exposedPorts = make([]types.TransportPort, len(ep.exposedPorts)) + copy(dstEp.exposedPorts, ep.exposedPorts) + + dstEp.generic = options.Generic{} + for k, v := range ep.generic { + dstEp.generic[k] = v + } + + return nil +} + func (ep *endpoint) ID() string { ep.Lock() defer ep.Unlock() @@ -122,16 +154,28 @@ func (ep *endpoint) Name() string { } func (ep *endpoint) Network() string { - return ep.getNetwork().name + if ep.network == nil { + return "" + } + + return ep.network.name } // endpoint Key structure : endpoint/network-id/endpoint-id func (ep *endpoint) Key() []string { - return []string{datastore.EndpointKeyPrefix, ep.getNetwork().id, ep.id} + if ep.network == nil { + return nil + } + + return []string{datastore.EndpointKeyPrefix, ep.network.id, ep.id} } func (ep *endpoint) KeyPrefix() []string { - return []string{datastore.EndpointKeyPrefix, ep.getNetwork().id} + if ep.network == nil { + return nil + } + + return []string{datastore.EndpointKeyPrefix, ep.network.id} } func (ep *endpoint) networkIDFromKey(key string) (string, error) { @@ -177,7 +221,7 @@ func (ep *endpoint) Exists() bool { } func (ep *endpoint) Skip() bool { - return ep.getNetwork().Skip() + return ep.getNetwork().Skip() || ep.DataScope() == datastore.LocalScope } func (ep *endpoint) processOptions(options ...EndpointOption) { @@ -191,8 +235,22 @@ func (ep *endpoint) processOptions(options ...EndpointOption) { } } -func (ep *endpoint) Join(sbox Sandbox, options ...EndpointOption) error { +func (ep *endpoint) getNetwork() *network { + ep.Lock() + defer ep.Unlock() + return ep.network +} + +func (ep *endpoint) getNetworkFromStore() (*network, error) { + if ep.network == nil { + return nil, fmt.Errorf("invalid network object in endpoint %s", ep.Name()) + } + + return ep.network.ctrlr.getNetworkFromStore(ep.network.id) +} + +func (ep *endpoint) Join(sbox Sandbox, options ...EndpointOption) error { if sbox == nil { return types.BadRequestErrorf("endpoint cannot be joined by nil container") } @@ -215,15 +273,27 @@ func (ep *endpoint) sbJoin(sbox Sandbox, options ...EndpointOption) error { return types.BadRequestErrorf("not a valid Sandbox interface") } + network, err := ep.getNetworkFromStore() + if err != nil { + return fmt.Errorf("failed to get network from store during join: %v", err) + } + + ep, err = network.getEndpointFromStore(ep.ID()) + if err != nil { + return fmt.Errorf("failed to get endpoint from store during join: %v", err) + } + ep.Lock() if ep.sandboxID != "" { ep.Unlock() return types.ForbiddenErrorf("a sandbox has already joined the endpoint") } + ep.Unlock() + ep.Lock() + ep.network = network ep.sandboxID = sbox.ID() ep.joinInfo = &endpointJoinInfo{} - network := ep.network epid := ep.id ep.Unlock() defer func() { @@ -235,12 +305,16 @@ func (ep *endpoint) sbJoin(sbox Sandbox, options ...EndpointOption) error { }() network.Lock() - driver := network.driver nid := network.id network.Unlock() ep.processOptions(options...) + driver, err := network.driver() + if err != nil { + return fmt.Errorf("failed to join endpoint: %v", err) + } + err = driver.Join(nid, epid, sbox.Key(), ep, sbox.Labels()) if err != nil { return err @@ -262,14 +336,15 @@ func (ep *endpoint) sbJoin(sbox Sandbox, options ...EndpointOption) error { return err } - if err = sb.updateDNS(ep.getNetwork().enableIPv6); err != nil { + // Watch for service records + network.getController().watchSvcRecord(ep) + + if err = sb.updateDNS(network.enableIPv6); err != nil { return err } - if !ep.isLocalScoped() { - if err = network.ctrlr.updateToStore(ep); err != nil { - return err - } + if err = network.getController().updateToStore(ep); err != nil { + return err } sb.Lock() @@ -327,6 +402,16 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error { return types.BadRequestErrorf("not a valid Sandbox interface") } + n, err := ep.getNetworkFromStore() + if err != nil { + return fmt.Errorf("failed to get network from store during leave: %v", err) + } + + ep, err = n.getEndpointFromStore(ep.ID()) + if err != nil { + return fmt.Errorf("failed to get endpoint from store during leave: %v", err) + } + ep.Lock() sid := ep.sandboxID ep.Unlock() @@ -342,21 +427,19 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error { ep.Lock() ep.sandboxID = "" - n := ep.network + ep.network = n ep.Unlock() - n.Lock() - c := n.ctrlr - d := n.driver - n.Unlock() + if err := n.getController().updateToStore(ep); err != nil { + ep.Lock() + ep.sandboxID = sid + ep.Unlock() + return err + } - if !ep.isLocalScoped() { - if err := c.updateToStore(ep); err != nil { - ep.Lock() - ep.sandboxID = sid - ep.Unlock() - return err - } + d, err := n.driver() + if err != nil { + return fmt.Errorf("failed to leave endpoint: %v", err) } if err := d.Leave(n.id, ep.id); err != nil { @@ -367,6 +450,9 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error { return err } + // unwatch for service records + n.getController().unWatchSvcRecord(ep) + if sb.needDefaultGW() { ep := sb.getEPwithoutGateway() if ep == nil { @@ -379,49 +465,48 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error { func (ep *endpoint) Delete() error { var err error + n, err := ep.getNetworkFromStore() + if err != nil { + return fmt.Errorf("failed to get network during Delete: %v", err) + } + + ep, err = n.getEndpointFromStore(ep.ID()) + if err != nil { + return fmt.Errorf("failed to get endpoint from store during Delete: %v", err) + } + ep.Lock() epid := ep.id name := ep.name - n := ep.network if ep.sandboxID != "" { ep.Unlock() return &ActiveContainerError{name: name, id: epid} } - n.Lock() - ctrlr := n.ctrlr - n.Unlock() ep.Unlock() - if !ep.isLocalScoped() { - if err = ctrlr.deleteFromStore(ep); err != nil { - return err - } - } - defer func() { - if err != nil { - ep.dbExists = false - if !ep.isLocalScoped() { - if e := ctrlr.updateToStore(ep); e != nil { - log.Warnf("failed to recreate endpoint in store %s : %v", name, e) - } - } - } - }() - - // Update the endpoint count in network and update it in the datastore - n.DecEndpointCnt() - if err = ctrlr.updateToStore(n); err != nil { + if err = n.DecEndpointCnt(); err != nil { return err } defer func() { if err != nil { - n.IncEndpointCnt() - if e := ctrlr.updateToStore(n); e != nil { + if e := n.IncEndpointCnt(); e != nil { log.Warnf("failed to update network %s : %v", n.name, e) } } }() + if err = n.getController().deleteFromStore(ep); err != nil { + return err + } + defer func() { + if err != nil { + ep.dbExists = false + if e := n.getController().updateToStore(ep); e != nil { + log.Warnf("failed to recreate endpoint in store %s : %v", name, e) + } + } + }() + if err = ep.deleteEndpoint(); err != nil { return err } @@ -438,38 +523,21 @@ func (ep *endpoint) deleteEndpoint() error { epid := ep.id ep.Unlock() - n.Lock() - _, ok := n.endpoints[epid] - if !ok { - n.Unlock() - return nil + driver, err := n.driver() + if err != nil { + return fmt.Errorf("failed to delete endpoint: %v", err) } - nid := n.id - driver := n.driver - delete(n.endpoints, epid) - n.Unlock() - - if err := driver.DeleteEndpoint(nid, epid); err != nil { + if err := driver.DeleteEndpoint(n.id, epid); err != nil { if _, ok := err.(types.ForbiddenError); ok { - n.Lock() - n.endpoints[epid] = ep - n.Unlock() return err } log.Warnf("driver error deleting endpoint %s : %v", name, err) } - n.updateSvcRecord(ep, false) return nil } -func (ep *endpoint) getNetwork() *network { - ep.Lock() - defer ep.Unlock() - return ep.network -} - func (ep *endpoint) getSandbox() (*sandbox, bool) { ep.Lock() c := ep.network.getController() @@ -545,14 +613,8 @@ func JoinOptionPriority(ep Endpoint, prio int) EndpointOption { } } -func (ep *endpoint) DataScope() datastore.DataScope { - ep.Lock() - defer ep.Unlock() - return ep.network.dataScope -} - -func (ep *endpoint) isLocalScoped() bool { - return ep.DataScope() == datastore.LocalScope +func (ep *endpoint) DataScope() string { + return ep.getNetwork().DataScope() } func (ep *endpoint) assignAddress() error { diff --git a/libnetwork/endpoint_info.go b/libnetwork/endpoint_info.go index 4e25fec3af..7c765f4000 100644 --- a/libnetwork/endpoint_info.go +++ b/libnetwork/endpoint_info.go @@ -2,6 +2,7 @@ package libnetwork import ( "encoding/json" + "fmt" "net" "github.com/docker/libnetwork/driverapi" @@ -115,6 +116,21 @@ func (epi *endpointInterface) UnmarshalJSON(b []byte) error { return nil } +func (epi *endpointInterface) CopyTo(dstEpi *endpointInterface) error { + dstEpi.mac = types.GetMacCopy(epi.mac) + dstEpi.addr = types.GetIPNetCopy(epi.addr) + dstEpi.addrv6 = types.GetIPNetCopy(epi.addrv6) + dstEpi.srcName = epi.srcName + dstEpi.dstPrefix = epi.dstPrefix + dstEpi.poolID = epi.poolID + + for _, route := range epi.routes { + dstEpi.routes = append(dstEpi.routes, types.GetIPNetCopy(route)) + } + + return nil +} + type endpointJoinInfo struct { gw net.IP gw6 net.IP @@ -122,21 +138,38 @@ type endpointJoinInfo struct { } func (ep *endpoint) Info() EndpointInfo { - return ep + n, err := ep.getNetworkFromStore() + if err != nil { + return nil + } + + ep, err = n.getEndpointFromStore(ep.ID()) + if err != nil { + return nil + } + + sb, ok := ep.getSandbox() + if !ok { + // endpoint hasn't joined any sandbox. + // Just return the endpoint + return ep + } + + return sb.getEndpoint(ep.ID()) } func (ep *endpoint) DriverInfo() (map[string]interface{}, error) { - ep.Lock() - network := ep.network - epid := ep.id - ep.Unlock() + n, err := ep.getNetworkFromStore() + if err != nil { + return nil, fmt.Errorf("could not find network in store for driver info: %v", err) + } - network.Lock() - driver := network.driver - nid := network.id - network.Unlock() + driver, err := n.driver() + if err != nil { + return nil, fmt.Errorf("failed to get driver info: %v", err) + } - return driver.EndpointOperInfo(nid, epid) + return driver.EndpointOperInfo(n.ID(), ep.ID()) } func (ep *endpoint) Iface() InterfaceInfo { diff --git a/libnetwork/libnetwork_internal_test.go b/libnetwork/libnetwork_internal_test.go index 2b5e20dd8d..f59b41a79f 100644 --- a/libnetwork/libnetwork_internal_test.go +++ b/libnetwork/libnetwork_internal_test.go @@ -6,7 +6,6 @@ import ( "net" "testing" - "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/netlabel" "github.com/docker/libnetwork/types" @@ -32,11 +31,6 @@ func TestDriverRegistration(t *testing.T) { } } -func SetTestDataStore(c NetworkController, custom datastore.DataStore) { - con := c.(*controller) - con.globalStore = custom -} - func TestNetworkMarshalling(t *testing.T) { n := &network{ name: "Miao", diff --git a/libnetwork/libnetwork_test.go b/libnetwork/libnetwork_test.go index 87436835d4..db9ec3ec1d 100644 --- a/libnetwork/libnetwork_test.go +++ b/libnetwork/libnetwork_test.go @@ -50,7 +50,7 @@ func TestMain(m *testing.M) { os.Exit(1) } - libnetwork.SetTestDataStore(controller, datastore.NewCustomDataStore(datastore.NewMockStore())) + //libnetwork.SetTestDataStore(controller, datastore.NewCustomDataStore(datastore.NewMockStore())) x := m.Run() controller.Stop() @@ -60,6 +60,9 @@ func TestMain(m *testing.M) { func createController() error { var err error + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + option := options.Generic{ "EnableIPForwarding": true, } @@ -358,27 +361,6 @@ func TestNilRemoteDriver(t *testing.T) { } } -func TestDuplicateNetwork(t *testing.T) { - if !testutils.IsRunningInContainer() { - defer testutils.SetupTestOSContext(t)() - } - - // Creating a default bridge name network (can't be removed) - _, err := controller.NewNetwork(bridgeNetType, "testdup") - if err != nil { - t.Fatal(err) - } - - _, err = controller.NewNetwork(bridgeNetType, "testdup") - if err == nil { - t.Fatal("Expected to fail. But instead succeeded") - } - - if _, ok := err.(libnetwork.NetworkNameError); !ok { - t.Fatalf("Did not fail with expected error. Actual error: %v", err) - } -} - func TestNetworkName(t *testing.T) { if !testutils.IsRunningInContainer() { defer testutils.SetupTestOSContext(t)() @@ -703,7 +685,7 @@ func TestNetworkEndpointsWalkers(t *testing.T) { if netWanted == nil { t.Fatal(err) } - if net1 != netWanted { + if net1.ID() != netWanted.ID() { t.Fatal(err) } @@ -712,7 +694,7 @@ func TestNetworkEndpointsWalkers(t *testing.T) { if netWanted == nil { t.Fatal(err) } - if net2 != netWanted { + if net2.ID() != netWanted.ID() { t.Fatal(err) } } @@ -843,7 +825,7 @@ func TestControllerQuery(t *testing.T) { if err != nil { t.Fatalf("Unexpected failure for NetworkByID(): %v", err) } - if net1 != g { + if net1.ID() != g.ID() { t.Fatalf("NetworkByID() returned unexpected element: %v", g) } @@ -863,7 +845,7 @@ func TestControllerQuery(t *testing.T) { if err != nil { t.Fatalf("Unexpected failure for NetworkByID(): %v", err) } - if net2 != g { + if net2.ID() != g.ID() { t.Fatalf("NetworkByID() returned unexpected element: %v", g) } } @@ -940,7 +922,7 @@ func TestNetworkQuery(t *testing.T) { if err != nil { t.Fatal(err) } - if ep12 != e { + if ep12.ID() != e.ID() { t.Fatalf("EndpointByID() returned %v instead of %v", e, ep12) } diff --git a/libnetwork/network.go b/libnetwork/network.go index 94a56c2fd9..f659259271 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -2,6 +2,7 @@ package libnetwork import ( "encoding/json" + "fmt" "net" "sync" @@ -127,7 +128,6 @@ type network struct { networkType string id string ipamType string - driver driverapi.Driver addrSpace string ipamV4Config []*IpamConf ipamV6Config []*IpamConf @@ -135,14 +135,14 @@ type network struct { ipamV6Info []*IpamInfo enableIPv6 bool endpointCnt uint64 - endpoints endpointTable generic options.Generic dbIndex uint64 svcRecords svcMap dbExists bool persist bool stopWatchCh chan struct{} - dataScope datastore.DataScope + scope string + drvOnce *sync.Once sync.Mutex } @@ -164,11 +164,7 @@ func (n *network) Type() string { n.Lock() defer n.Unlock() - if n.driver == nil { - return "" - } - - return n.driver.Type() + return n.networkType } func (n *network) Key() []string { @@ -220,10 +216,72 @@ func (n *network) Skip() bool { return !n.persist } -func (n *network) DataScope() datastore.DataScope { +func (n *network) New() datastore.KVObject { n.Lock() defer n.Unlock() - return n.dataScope + + return &network{ + ctrlr: n.ctrlr, + drvOnce: &sync.Once{}, + } +} + +// CopyTo deep copies to the destination IpamInfo +func (i *IpamInfo) CopyTo(dstI *IpamInfo) error { + dstI.PoolID = i.PoolID + if i.Meta != nil { + dstI.Meta = make(map[string]string) + for k, v := range i.Meta { + dstI.Meta[k] = v + } + } + + dstI.AddressSpace = i.AddressSpace + dstI.Pool = types.GetIPNetCopy(i.Pool) + dstI.Gateway = types.GetIPNetCopy(i.Gateway) + + if i.AuxAddresses != nil { + dstI.AuxAddresses = make(map[string]*net.IPNet) + for k, v := range i.AuxAddresses { + dstI.AuxAddresses[k] = types.GetIPNetCopy(v) + } + } + + return nil +} + +func (n *network) CopyTo(o datastore.KVObject) error { + n.Lock() + defer n.Unlock() + + dstN := o.(*network) + dstN.name = n.name + dstN.id = n.id + dstN.networkType = n.networkType + dstN.ipamType = n.ipamType + dstN.endpointCnt = n.endpointCnt + dstN.enableIPv6 = n.enableIPv6 + dstN.persist = n.persist + dstN.dbIndex = n.dbIndex + dstN.dbExists = n.dbExists + dstN.drvOnce = n.drvOnce + + for _, v4info := range n.ipamV4Info { + dstV4Info := &IpamInfo{} + v4info.CopyTo(dstV4Info) + dstN.ipamV4Info = append(dstN.ipamV4Info, dstV4Info) + } + + dstN.generic = options.Generic{} + for k, v := range n.generic { + dstN.generic[k] = v + } + + return nil +} + +func (n *network) DataScope() string { + return n.driverScope() } func (n *network) EndpointCnt() uint64 { @@ -232,16 +290,20 @@ func (n *network) EndpointCnt() uint64 { return n.endpointCnt } -func (n *network) IncEndpointCnt() { +func (n *network) IncEndpointCnt() error { n.Lock() n.endpointCnt++ n.Unlock() + + return n.getController().updateToStore(n) } -func (n *network) DecEndpointCnt() { +func (n *network) DecEndpointCnt() error { n.Lock() n.endpointCnt-- n.Unlock() + + return n.getController().updateToStore(n) } // TODO : Can be made much more generic with the help of reflection (but has some golang limitations) @@ -372,17 +434,55 @@ func (n *network) processOptions(options ...NetworkOption) { } } -func (n *network) Delete() error { - var err error +func (n *network) driverScope() string { + c := n.getController() - ctrlr := n.getController() - - ctrlr.Lock() - _, ok := ctrlr.networks[n.id] - ctrlr.Unlock() + c.Lock() + // Check if a driver for the specified network type is available + dd, ok := c.drivers[n.networkType] + c.Unlock() if !ok { - return &UnknownNetworkError{name: n.name, id: n.id} + var err error + dd, err = c.loadDriver(n.networkType) + if err != nil { + // If driver could not be resolved simply return an empty string + return "" + } + } + + return dd.capability.DataScope +} + +func (n *network) driver() (driverapi.Driver, error) { + c := n.getController() + + c.Lock() + // Check if a driver for the specified network type is available + dd, ok := c.drivers[n.networkType] + c.Unlock() + + if !ok { + var err error + dd, err = c.loadDriver(n.networkType) + if err != nil { + return nil, err + } + } + + return dd.driver, nil +} + +func (n *network) Delete() error { + n.Lock() + c := n.ctrlr + name := n.name + id := n.id + n.Unlock() + + n, err := c.getNetworkFromStore(id) + if err != nil { + return &UnknownNetworkError{name: name, id: id} } numEps := n.EndpointCnt() @@ -390,9 +490,22 @@ func (n *network) Delete() error { return &ActiveEndpointsError{name: n.name, id: n.id} } - // deleteNetworkFromStore performs an atomic delete operation and the network.endpointCnt field will help - // prevent any possible race between endpoint join and network delete - if err = ctrlr.deleteFromStore(n); err != nil { + if err = n.deleteNetwork(); err != nil { + return err + } + defer func() { + if err != nil { + if e := c.addNetwork(n); e != nil { + log.Warnf("failed to rollback deleteNetwork for network %s: %v", + n.Name(), err) + } + } + }() + + // deleteFromStore performs an atomic delete operation and the + // network.endpointCnt field will help prevent any possible + // race between endpoint join and network delete + if err = n.getController().deleteFromStore(n); err != nil { if err == datastore.ErrKeyModified { return types.InternalErrorf("operation in progress. delete failed for network %s. Please try again.") } @@ -402,65 +515,68 @@ func (n *network) Delete() error { defer func() { if err != nil { n.dbExists = false - if e := ctrlr.updateToStore(n); e != nil { + if e := n.getController().updateToStore(n); e != nil { log.Warnf("failed to recreate network in store %s : %v", n.name, e) } } }() - if err = n.deleteNetwork(); err != nil { - return err - } - n.ipamRelease() return nil } func (n *network) deleteNetwork() error { - n.Lock() - id := n.id - d := n.driver - n.ctrlr.Lock() - delete(n.ctrlr.networks, id) - n.ctrlr.Unlock() - n.Unlock() + d, err := n.driver() + if err != nil { + return fmt.Errorf("failed deleting network: %v", err) + } - if err := d.DeleteNetwork(n.id); err != nil { + // If it is bridge network type make sure we call the driver about the network + // because the network may have been created in some past life of libnetwork. + if n.Type() == "bridge" { + n.drvOnce.Do(func() { + err = n.getController().addNetwork(n) + }) + if err != nil { + return err + } + } + + if err := d.DeleteNetwork(n.ID()); err != nil { // Forbidden Errors should be honored if _, ok := err.(types.ForbiddenError); ok { - n.ctrlr.Lock() - n.ctrlr.networks[n.id] = n - n.ctrlr.Unlock() return err } log.Warnf("driver error deleting network %s : %v", n.name, err) } - n.stopWatch() + return nil } func (n *network) addEndpoint(ep *endpoint) error { - var err error - n.Lock() - n.endpoints[ep.id] = ep - d := n.driver - n.Unlock() + d, err := n.driver() + if err != nil { + return fmt.Errorf("failed to add endpoint: %v", err) + } - defer func() { + // If it is bridge network type make sure we call the driver about the network + // because the network may have been created in some past life of libnetwork. + if n.Type() == "bridge" { + n.drvOnce.Do(func() { + err = n.getController().addNetwork(n) + }) if err != nil { - n.Lock() - delete(n.endpoints, ep.id) - n.Unlock() + return err } - }() + } err = d.CreateEndpoint(n.id, ep.id, ep.Interface(), ep.generic) if err != nil { - return types.InternalErrorf("failed to create endpoint %s on network %s: %v", ep.Name(), n.Name(), err) + return types.InternalErrorf("failed to create endpoint %s on network %s: %v", + ep.Name(), n.Name(), err) } - n.updateSvcRecord(ep, true) return nil } @@ -476,7 +592,16 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi ep := &endpoint{name: name, generic: make(map[string]interface{}), iface: &endpointInterface{}} ep.id = stringid.GenerateRandomID() + + // Initialize ep.network with a possibly stale copy of n. We need this to get network from + // store. But once we get it from store we will have the most uptodate copy possible. ep.network = n + ep.network, err = ep.getNetworkFromStore() + if err != nil { + return nil, fmt.Errorf("failed to get network during CreateEndpoint: %v", err) + } + n = ep.network + ep.processOptions(options...) if err = ep.assignAddress(); err != nil { @@ -488,46 +613,46 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi } }() - ctrlr := n.getController() - - n.IncEndpointCnt() - if err = ctrlr.updateToStore(n); err != nil { - return nil, err - } - defer func() { - if err != nil { - n.DecEndpointCnt() - if err = ctrlr.updateToStore(n); err != nil { - log.Warnf("endpoint count cleanup failed when updating network for %s : %v", name, err) - } - } - }() if err = n.addEndpoint(ep); err != nil { return nil, err } defer func() { if err != nil { - if e := ep.Delete(); ep != nil { + if e := ep.deleteEndpoint(); e != nil { log.Warnf("cleaning up endpoint failed %s : %v", name, e) } } }() - if !ep.isLocalScoped() { - if err = ctrlr.updateToStore(ep); err != nil { - return nil, err + if err = n.getController().updateToStore(ep); err != nil { + return nil, err + } + defer func() { + if err != nil { + if e := n.getController().deleteFromStore(ep); e != nil { + log.Warnf("error rolling back endpoint %s from store: %v", name, e) + } } + }() + + // Increment endpoint count to indicate completion of endpoint addition + if err = n.IncEndpointCnt(); err != nil { + return nil, err } return ep, nil } func (n *network) Endpoints() []Endpoint { - n.Lock() - defer n.Unlock() - list := make([]Endpoint, 0, len(n.endpoints)) - for _, e := range n.endpoints { - list = append(list, e) + var list []Endpoint + + endpoints, err := n.getEndpointsFromStore() + if err != nil { + log.Error(err) + } + + for _, ep := range endpoints { + list = append(list, ep) } return list @@ -568,28 +693,32 @@ func (n *network) EndpointByID(id string) (Endpoint, error) { if id == "" { return nil, ErrInvalidID(id) } - n.Lock() - defer n.Unlock() - if e, ok := n.endpoints[id]; ok { - return e, nil + + ep, err := n.getEndpointFromStore(id) + if err != nil { + return nil, ErrNoSuchEndpoint(id) } - return nil, ErrNoSuchEndpoint(id) + + return ep, nil } -func (n *network) isGlobalScoped() bool { - return n.DataScope() == datastore.GlobalScope -} +func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool) { + c := n.getController() + sr, ok := c.svcDb[n.ID()] + if !ok { + c.svcDb[n.ID()] = svcMap{} + sr = c.svcDb[n.ID()] + } -func (n *network) updateSvcRecord(ep *endpoint, isAdd bool) { n.Lock() var recs []etchosts.Record if iface := ep.Iface(); iface.Address() != nil { if isAdd { - n.svcRecords[ep.Name()] = iface.Address().IP - n.svcRecords[ep.Name()+"."+n.name] = iface.Address().IP + sr[ep.Name()] = iface.Address().IP + sr[ep.Name()+"."+n.name] = iface.Address().IP } else { - delete(n.svcRecords, ep.Name()) - delete(n.svcRecords, ep.Name()+"."+n.name) + delete(sr, ep.Name()) + delete(sr, ep.Name()+"."+n.name) } recs = append(recs, etchosts.Record{ @@ -610,12 +739,11 @@ func (n *network) updateSvcRecord(ep *endpoint, isAdd bool) { } var sbList []*sandbox - n.WalkEndpoints(func(e Endpoint) bool { - if sb, hasSandbox := e.(*endpoint).getSandbox(); hasSandbox { + for _, ep := range localEps { + if sb, hasSandbox := ep.getSandbox(); hasSandbox { sbList = append(sbList, sb) } - return false - }) + } for _, sb := range sbList { if isAdd { @@ -631,7 +759,9 @@ func (n *network) getSvcRecords() []etchosts.Record { defer n.Unlock() var recs []etchosts.Record - for h, ip := range n.svcRecords { + sr, _ := n.ctrlr.svcDb[n.id] + + for h, ip := range sr { recs = append(recs, etchosts.Record{ Hosts: h, IP: ip.String(), @@ -799,7 +929,7 @@ func (n *network) deriveAddressSpace() (string, error) { if !ok { return "", types.NotFoundErrorf("could not find ipam driver %s to get default address space", n.ipamType) } - if n.isGlobalScoped() { + if n.DataScope() == datastore.GlobalScope { return ipd.defaultGlobalAddressSpace, nil } return ipd.defaultLocalAddressSpace, nil diff --git a/libnetwork/sandbox.go b/libnetwork/sandbox.go index 303daa8d91..1125fe274b 100644 --- a/libnetwork/sandbox.go +++ b/libnetwork/sandbox.go @@ -247,6 +247,19 @@ func (sb *sandbox) getConnectedEndpoints() []*endpoint { return eps } +func (sb *sandbox) getEndpoint(id string) *endpoint { + sb.Lock() + defer sb.Unlock() + + for _, ep := range sb.endpoints { + if ep.id == id { + return ep + } + } + + return nil +} + func (sb *sandbox) updateGateway(ep *endpoint) error { sb.Lock() osSbox := sb.osSbox @@ -359,7 +372,13 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error { return nil } -func (sb *sandbox) clearNetworkResources(ep *endpoint) error { +func (sb *sandbox) clearNetworkResources(origEp *endpoint) error { + ep := sb.getEndpoint(origEp.id) + if ep == nil { + return fmt.Errorf("could not find the sandbox endpoint data for endpoint %s", + ep.name) + } + sb.Lock() osSbox := sb.osSbox sb.Unlock() @@ -837,7 +856,7 @@ func (eh epHeap) Less(i, j int) bool { cjp = 0 } if cip == cjp { - return eh[i].getNetwork().Name() < eh[j].getNetwork().Name() + return eh[i].network.Name() < eh[j].network.Name() } return cip > cjp diff --git a/libnetwork/sandbox_test.go b/libnetwork/sandbox_test.go index 0f47248eca..b17275cced 100644 --- a/libnetwork/sandbox_test.go +++ b/libnetwork/sandbox_test.go @@ -115,21 +115,21 @@ func TestSandboxAddMultiPrio(t *testing.T) { t.Fatal(err) } - if ctrlr.sandboxes[sid].endpoints[0] != ep3 { + if ctrlr.sandboxes[sid].endpoints[0].ID() != ep3.ID() { t.Fatal("Expected ep3 to be at the top of the heap. But did not find ep3 at the top of the heap") } if err := ep3.Leave(sbx); err != nil { t.Fatal(err) } - if ctrlr.sandboxes[sid].endpoints[0] != ep2 { + if ctrlr.sandboxes[sid].endpoints[0].ID() != ep2.ID() { t.Fatal("Expected ep2 to be at the top of the heap after removing ep3. But did not find ep2 at the top of the heap") } if err := ep2.Leave(sbx); err != nil { t.Fatal(err) } - if ctrlr.sandboxes[sid].endpoints[0] != ep1 { + if ctrlr.sandboxes[sid].endpoints[0].ID() != ep1.ID() { t.Fatal("Expected ep1 to be at the top of the heap after removing ep2. But did not find ep1 at the top of the heap") } @@ -138,7 +138,7 @@ func TestSandboxAddMultiPrio(t *testing.T) { t.Fatal(err) } - if ctrlr.sandboxes[sid].endpoints[0] != ep3 { + if ctrlr.sandboxes[sid].endpoints[0].ID() != ep3.ID() { t.Fatal("Expected ep3 to be at the top of the heap after adding ep3 back. But did not find ep3 at the top of the heap") } @@ -185,7 +185,7 @@ func TestSandboxAddSamePrio(t *testing.T) { t.Fatal(err) } - if ctrlr.sandboxes[sid].endpoints[0] != ep1 { + if ctrlr.sandboxes[sid].endpoints[0].ID() != ep1.ID() { t.Fatal("Expected ep1 to be at the top of the heap. But did not find ep1 at the top of the heap") } @@ -193,7 +193,7 @@ func TestSandboxAddSamePrio(t *testing.T) { t.Fatal(err) } - if ctrlr.sandboxes[sid].endpoints[0] != ep2 { + if ctrlr.sandboxes[sid].endpoints[0].ID() != ep2.ID() { t.Fatal("Expected ep2 to be at the top of the heap after removing ep3. But did not find ep2 at the top of the heap") } diff --git a/libnetwork/store.go b/libnetwork/store.go index 9cbbfc5a73..a45aa3b8f9 100644 --- a/libnetwork/store.go +++ b/libnetwork/store.go @@ -1,408 +1,348 @@ package libnetwork import ( - "encoding/json" "fmt" - "time" log "github.com/Sirupsen/logrus" - "github.com/docker/libkv/store" - "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/datastore" ) -var ( - defaultBoltTimeout = 3 * time.Second - defaultLocalStoreConfig = config.DatastoreCfg{ - Embedded: true, - Client: config.DatastoreClientCfg{ - Provider: "boltdb", - Address: defaultPrefix + "/boltdb.db", - Config: &store.Config{ - Bucket: "libnetwork", - ConnectionTimeout: defaultBoltTimeout, - }, - }, - } -) - -func (c *controller) validateGlobalStoreConfig() bool { - return c.cfg != nil && c.cfg.GlobalStore.Client.Provider != "" && c.cfg.GlobalStore.Client.Address != "" -} - -func (c *controller) initGlobalStore() error { +func (c *controller) initStores() error { c.Lock() - cfg := c.cfg - c.Unlock() - if !c.validateGlobalStoreConfig() { - return fmt.Errorf("globalstore initialization requires a valid configuration") - } - - store, err := datastore.NewDataStore(&cfg.GlobalStore) - if err != nil { - return err - } - c.Lock() - c.globalStore = store - c.Unlock() - return nil -} - -func (c *controller) initLocalStore() error { - c.Lock() - cfg := c.cfg - c.Unlock() - localStore, err := datastore.NewDataStore(c.getLocalStoreConfig(cfg)) - if err != nil { - return err - } - c.Lock() - c.localStore = localStore - c.Unlock() - return nil -} - -func (c *controller) restoreFromGlobalStore() error { - c.Lock() - s := c.globalStore - c.Unlock() - if s == nil { - return nil - } - c.restore("global") - return c.watchNetworks() -} - -func (c *controller) restoreFromLocalStore() error { - c.Lock() - s := c.localStore - c.Unlock() - if s != nil { - c.restore("local") - } - return nil -} - -func (c *controller) restore(store string) { - nws, err := c.getNetworksFromStore(store == "global") - if err == nil { - c.processNetworkUpdate(nws, nil) - } else if err != datastore.ErrKeyNotFound { - log.Warnf("failed to read networks from %s store during init : %v", store, err) - } -} - -func (c *controller) getNetworksFromStore(global bool) ([]*store.KVPair, error) { - var cs datastore.DataStore - c.Lock() - if global { - cs = c.globalStore - } else { - cs = c.localStore - } - c.Unlock() - return cs.KVStore().List(datastore.Key(datastore.NetworkKeyPrefix)) -} - -func (c *controller) newNetworkFromStore(n *network) error { - n.Lock() - n.ctrlr = c - n.endpoints = endpointTable{} - n.Unlock() - - return c.addNetwork(n) -} - -func (c *controller) newEndpointFromStore(key string, ep *endpoint) error { - ep.Lock() - n := ep.network - id := ep.id - ep.Unlock() - - _, err := n.EndpointByID(id) - if err != nil { - if _, ok := err.(ErrNoSuchEndpoint); ok { - return n.addEndpoint(ep) - } - } - return err -} - -func (c *controller) updateToStore(kvObject datastore.KV) error { - if kvObject.Skip() { - return nil - } - cs := c.getDataStore(kvObject.DataScope()) - if cs == nil { - log.Debugf("datastore not initialized. kv object %s is not added to the store", datastore.Key(kvObject.Key()...)) - return nil - } - - return cs.PutObjectAtomic(kvObject) -} - -func (c *controller) deleteFromStore(kvObject datastore.KV) error { - if kvObject.Skip() { - return nil - } - cs := c.getDataStore(kvObject.DataScope()) - if cs == nil { - log.Debugf("datastore not initialized. kv object %s is not deleted from datastore", datastore.Key(kvObject.Key()...)) - return nil - } - - if err := cs.DeleteObjectAtomic(kvObject); err != nil { - return err - } - - return nil -} - -func (c *controller) watchNetworks() error { - if !c.validateGlobalStoreConfig() { - return nil - } - - c.Lock() - cs := c.globalStore - c.Unlock() - - networkKey := datastore.Key(datastore.NetworkKeyPrefix) - if err := ensureKeys(networkKey, cs); err != nil { - return fmt.Errorf("failed to ensure if the network keys are valid and present in store: %v", err) - } - nwPairs, err := cs.KVStore().WatchTree(networkKey, nil) - if err != nil { - return err - } - go func() { - for { - select { - case nws := <-nwPairs: - c.Lock() - tmpview := networkTable{} - lview := c.networks - c.Unlock() - for k, v := range lview { - if v.isGlobalScoped() { - tmpview[k] = v - } - } - c.processNetworkUpdate(nws, &tmpview) - - // Delete processing - for k := range tmpview { - c.Lock() - existing, ok := c.networks[k] - c.Unlock() - if !ok { - continue - } - tmp := network{} - if err := c.globalStore.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound { - continue - } - if err := existing.deleteNetwork(); err != nil { - log.Debugf("Delete failed %s: %s", existing.name, err) - } - } - } - } - }() - return nil -} - -func (n *network) watchEndpoints() error { - if n.Skip() || !n.ctrlr.validateGlobalStoreConfig() { - return nil - } - - n.Lock() - cs := n.ctrlr.globalStore - tmp := endpoint{network: n} - n.stopWatchCh = make(chan struct{}) - stopCh := n.stopWatchCh - n.Unlock() - - endpointKey := datastore.Key(tmp.KeyPrefix()...) - if err := ensureKeys(endpointKey, cs); err != nil { - return fmt.Errorf("failed to ensure if the endpoint keys are valid and present in store: %v", err) - } - epPairs, err := cs.KVStore().WatchTree(endpointKey, stopCh) - if err != nil { - return err - } - go func() { - for { - select { - case <-stopCh: - return - case eps := <-epPairs: - n.Lock() - tmpview := endpointTable{} - lview := n.endpoints - n.Unlock() - for k, v := range lview { - if v.network.isGlobalScoped() { - tmpview[k] = v - } - } - n.ctrlr.processEndpointsUpdate(eps, &tmpview) - // Delete processing - for k := range tmpview { - n.Lock() - existing, ok := n.endpoints[k] - n.Unlock() - if !ok { - continue - } - tmp := endpoint{} - if err := cs.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound { - continue - } - if err := existing.deleteEndpoint(); err != nil { - log.Debugf("Delete failed %s: %s", existing.name, err) - } - } - } - } - }() - return nil -} - -func (n *network) stopWatch() { - n.Lock() - if n.stopWatchCh != nil { - close(n.stopWatchCh) - n.stopWatchCh = nil - } - n.Unlock() -} - -func (c *controller) processNetworkUpdate(nws []*store.KVPair, prune *networkTable) { - for _, kve := range nws { - var n network - err := json.Unmarshal(kve.Value, &n) - if err != nil { - log.Error(err) - continue - } - if prune != nil { - delete(*prune, n.id) - } - n.SetIndex(kve.LastIndex) - c.Lock() - existing, ok := c.networks[n.id] + if c.cfg == nil { c.Unlock() - if ok { - existing.Lock() - // Skip existing network update - if existing.dbIndex != n.Index() { - // Can't use SetIndex() since existing is locked. - existing.dbIndex = n.Index() - existing.dbExists = true - existing.endpointCnt = n.endpointCnt - } - existing.Unlock() - continue - } - - if err = c.newNetworkFromStore(&n); err != nil { - log.Error(err) - } - } -} - -func (c *controller) processEndpointUpdate(ep *endpoint) bool { - nw := ep.network - if nw == nil { - return true - } - nw.Lock() - id := nw.id - nw.Unlock() - - c.Lock() - n, ok := c.networks[id] - c.Unlock() - if !ok { - return true - } - existing, _ := n.EndpointByID(ep.id) - if existing == nil { - return true - } - - ee := existing.(*endpoint) - ee.Lock() - if ee.dbIndex != ep.Index() { - // Can't use SetIndex() because ee is locked. - ee.dbIndex = ep.Index() - ee.dbExists = true - ee.sandboxID = ep.sandboxID - } - ee.Unlock() - - return false -} - -func ensureKeys(key string, cs datastore.DataStore) error { - exists, err := cs.KVStore().Exists(key) - if err != nil { - return err - } - if exists { return nil } - return cs.KVStore().Put(key, []byte{}, nil) -} + scopeConfigs := c.cfg.Scopes + c.Unlock() -func (c *controller) getLocalStoreConfig(cfg *config.Config) *config.DatastoreCfg { - if cfg != nil && cfg.LocalStore.Client.Provider != "" && cfg.LocalStore.Client.Address != "" { - return &cfg.LocalStore + for scope, scfg := range scopeConfigs { + store, err := datastore.NewDataStore(scope, scfg) + if err != nil { + return err + } + c.Lock() + c.stores = append(c.stores, store) + c.Unlock() } - return &defaultLocalStoreConfig + + c.startWatch() + return nil } -func (c *controller) getDataStore(dataScope datastore.DataScope) (dataStore datastore.DataStore) { +func (c *controller) closeStores() { + for _, store := range c.getStores() { + store.Close() + } +} + +func (c *controller) getStore(scope string) datastore.DataStore { c.Lock() - if dataScope == datastore.GlobalScope { - dataStore = c.globalStore - } else if dataScope == datastore.LocalScope { - dataStore = c.localStore + defer c.Unlock() + + for _, store := range c.stores { + if store.Scope() == scope { + return store + } + } + + return nil +} + +func (c *controller) getStores() []datastore.DataStore { + c.Lock() + defer c.Unlock() + + return c.stores +} + +func (c *controller) getNetworkFromStore(nid string) (*network, error) { + for _, store := range c.getStores() { + n := &network{id: nid, ctrlr: c} + err := store.GetObject(datastore.Key(n.Key()...), n) + if err != nil && err != datastore.ErrKeyNotFound { + return nil, fmt.Errorf("could not find network %s: %v", nid, err) + } + + // Continue searching in the next store if the key is not found in this store + if err == datastore.ErrKeyNotFound { + continue + } + + return n, nil + } + + return nil, fmt.Errorf("network %s not found", nid) +} + +func (c *controller) getNetworksFromStore() ([]*network, error) { + var nl []*network + + for _, store := range c.getStores() { + kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), + &network{ctrlr: c}) + if err != nil && err != datastore.ErrKeyNotFound { + return nil, fmt.Errorf("failed to get networks for scope %s: %v", + store.Scope(), err) + } + + // Continue searching in the next store if no keys found in this store + if err == datastore.ErrKeyNotFound { + continue + } + + for _, kvo := range kvol { + n := kvo.(*network) + n.ctrlr = c + nl = append(nl, n) + } + } + + return nl, nil +} + +func (n *network) getEndpointFromStore(eid string) (*endpoint, error) { + for _, store := range n.ctrlr.getStores() { + ep := &endpoint{id: eid, network: n} + err := store.GetObject(datastore.Key(ep.Key()...), ep) + if err != nil && err != datastore.ErrKeyNotFound { + return nil, fmt.Errorf("could not find endpoint %s: %v", eid, err) + } + + // Continue searching in the next store if the key is not found in this store + if err == datastore.ErrKeyNotFound { + continue + } + + return ep, nil + } + + return nil, fmt.Errorf("endpoint %s not found", eid) +} + +func (n *network) getEndpointsFromStore() ([]*endpoint, error) { + var epl []*endpoint + + tmp := endpoint{network: n} + for _, store := range n.getController().getStores() { + kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &endpoint{network: n}) + if err != nil && err != datastore.ErrKeyNotFound { + return nil, + fmt.Errorf("failed to get endpoints for network %s scope %s: %v", + n.Name(), store.Scope(), err) + } + + // Continue searching in the next store if no keys found in this store + if err == datastore.ErrKeyNotFound { + continue + } + + for _, kvo := range kvol { + ep := kvo.(*endpoint) + ep.network = n + epl = append(epl, ep) + } + } + + return epl, nil +} + +func (c *controller) updateToStore(kvObject datastore.KVObject) error { + cs := c.getStore(kvObject.DataScope()) + if cs == nil { + log.Warnf("datastore for scope %s not initialized. kv object %s is not added to the store", kvObject.DataScope(), datastore.Key(kvObject.Key()...)) + return nil + } + + if err := cs.PutObjectAtomic(kvObject); err != nil { + return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err) + } + + return nil +} + +func (c *controller) deleteFromStore(kvObject datastore.KVObject) error { + cs := c.getStore(kvObject.DataScope()) + if cs == nil { + log.Debugf("datastore for scope %s not initialized. kv object %s is not deleted from datastore", kvObject.DataScope(), datastore.Key(kvObject.Key()...)) + return nil + } + +retry: + if err := cs.DeleteObjectAtomic(kvObject); err != nil { + if err == datastore.ErrKeyModified { + if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil { + return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err) + } + goto retry + } + return err + } + + return nil +} + +type netWatch struct { + localEps map[string]*endpoint + remoteEps map[string]*endpoint + stopCh chan struct{} +} + +func (c *controller) getLocalEps(nw *netWatch) []*endpoint { + c.Lock() + defer c.Unlock() + + var epl []*endpoint + for _, ep := range nw.localEps { + epl = append(epl, ep) + } + + return epl +} + +func (c *controller) watchSvcRecord(ep *endpoint) { + c.watchCh <- ep +} + +func (c *controller) unWatchSvcRecord(ep *endpoint) { + c.unWatchCh <- ep +} + +func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, nCh <-chan datastore.KVObject) { + for { + select { + case <-nw.stopCh: + return + case o := <-nCh: + n := o.(*network) + + epl, err := n.getEndpointsFromStore() + if err != nil { + break + } + + c.Lock() + var addEp []*endpoint + + delEpMap := make(map[string]*endpoint) + for k, v := range nw.remoteEps { + delEpMap[k] = v + } + + for _, lEp := range epl { + if _, ok := nw.localEps[lEp.ID()]; ok { + continue + } + + if _, ok := nw.remoteEps[lEp.ID()]; ok { + delete(delEpMap, lEp.ID()) + continue + } + + nw.remoteEps[lEp.ID()] = lEp + addEp = append(addEp, lEp) + + } + c.Unlock() + + for _, lEp := range addEp { + ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), true) + } + + for _, lEp := range delEpMap { + ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), false) + + } + } + } +} + +func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoint) { + c.Lock() + nw, ok := nmap[ep.getNetwork().ID()] + c.Unlock() + + if ok { + // Update the svc db for the local endpoint join right away + ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), true) + + c.Lock() + nw.localEps[ep.ID()] = ep + c.Unlock() + return + } + + nw = &netWatch{ + localEps: make(map[string]*endpoint), + remoteEps: make(map[string]*endpoint), + } + + // Update the svc db for the local endpoint join right away + // Do this before adding this ep to localEps so that we don't + // try to update this ep's container's svc records + ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), true) + + c.Lock() + nw.localEps[ep.ID()] = ep + nmap[ep.getNetwork().ID()] = nw + nw.stopCh = make(chan struct{}) + c.Unlock() + + store := c.getStore(ep.getNetwork().DataScope()) + if store == nil { + return + } + + if !store.Watchable() { + return + } + + ch, err := store.Watch(ep.getNetwork(), nw.stopCh) + if err != nil { + log.Warnf("Error creating watch for network: %v", err) + return + } + + go c.networkWatchLoop(nw, ep, ch) +} + +func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoint) { + c.Lock() + nw, ok := nmap[ep.getNetwork().ID()] + + if ok { + delete(nw.localEps, ep.ID()) + c.Unlock() + + // Update the svc db about local endpoint leave right away + // Do this after we remove this ep from localEps so that we + // don't try to remove this svc record from this ep's container. + ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), false) + + c.Lock() + if len(nw.localEps) == 0 { + close(nw.stopCh) + delete(nmap, ep.getNetwork().ID()) + } } c.Unlock() - return } -func (c *controller) processEndpointsUpdate(eps []*store.KVPair, prune *endpointTable) { - for _, epe := range eps { - var ep endpoint - err := json.Unmarshal(epe.Value, &ep) - if err != nil { - log.Error(err) - continue - } - if prune != nil { - delete(*prune, ep.id) - } - ep.SetIndex(epe.LastIndex) - if nid, err := ep.networkIDFromKey(epe.Key); err != nil { - log.Error(err) - continue - } else { - if n, err := c.NetworkByID(nid); err != nil { - log.Error(err) - continue - } else { - ep.network = n.(*network) - } - } - if c.processEndpointUpdate(&ep) { - err = c.newEndpointFromStore(epe.Key, &ep) - if err != nil { - log.Error(err) - } +func (c *controller) watchLoop(nmap map[string]*netWatch) { + for { + select { + case ep := <-c.watchCh: + c.processEndpointCreate(nmap, ep) + case ep := <-c.unWatchCh: + c.processEndpointDelete(nmap, ep) } } } + +func (c *controller) startWatch() { + c.watchCh = make(chan *endpoint) + c.unWatchCh = make(chan *endpoint) + nmap := make(map[string]*netWatch) + + go c.watchLoop(nmap) +} diff --git a/libnetwork/store_test.go b/libnetwork/store_test.go index 63777c001e..f5e0871b32 100644 --- a/libnetwork/store_test.go +++ b/libnetwork/store_test.go @@ -33,7 +33,7 @@ func testNewController(t *testing.T, provider, url string) (NetworkController, e } func TestBoltdbBackend(t *testing.T) { - defer os.Remove(defaultLocalStoreConfig.Client.Address) + defer os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) testLocalBackend(t, "", "", nil) defer os.Remove("/tmp/boltdb.db") config := &store.Config{Bucket: "testBackend", ConnectionTimeout: 3 * time.Second} @@ -64,7 +64,7 @@ func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Con if err != nil { t.Fatalf("Error creating endpoint: %v", err) } - store := ctrl.(*controller).localStore.KVStore() + store := ctrl.(*controller).getStore(datastore.LocalScope).KVStore() if exists, err := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, string(nw.ID()))); !exists || err != nil { t.Fatalf("Network key should have been created.") } @@ -100,7 +100,7 @@ func TestNoPersist(t *testing.T) { if err != nil { t.Fatalf("Error creating endpoint: %v", err) } - store := ctrl.(*controller).localStore.KVStore() + store := ctrl.(*controller).getStore(datastore.LocalScope).KVStore() if exists, _ := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, string(nw.ID()))); exists { t.Fatalf("Network with persist=false should not be stored in KV Store") } @@ -138,12 +138,8 @@ func TestLocalStoreLockTimeout(t *testing.T) { } defer ctrl1.Stop() // Use the same boltdb file without closing the previous controller - ctrl2, err := New(cfgOptions...) - if err != nil { - t.Fatalf("Error new controller: %v", err) - } - store := ctrl2.(*controller).localStore - if store != nil { - t.Fatalf("localstore is expected to be nil") + _, err = New(cfgOptions...) + if err == nil { + t.Fatalf("Expected to fail but succeeded") } } From a13f78369f2af8d4694c9725d250da36e07e08cf Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Mon, 5 Oct 2015 04:24:44 -0700 Subject: [PATCH 3/4] IPAM watch removal and multistore support Remove the need for watching for IPAM data structures and add multi store support code and data reorganization to simplify address space management. Signed-off-by: Jana Radhakrishnan --- libnetwork/bitseq/sequence.go | 9 +- libnetwork/bitseq/store.go | 65 ++++---- libnetwork/ipam/allocator.go | 248 +++++++++++++++++------------- libnetwork/ipam/allocator_test.go | 142 +++++++++++++---- libnetwork/ipam/store.go | 137 ++++++++--------- libnetwork/ipam/structures.go | 141 ++++++++++++----- 6 files changed, 456 insertions(+), 286 deletions(-) diff --git a/libnetwork/bitseq/sequence.go b/libnetwork/bitseq/sequence.go index 1120ac77c8..20c50c91ed 100644 --- a/libnetwork/bitseq/sequence.go +++ b/libnetwork/bitseq/sequence.go @@ -57,9 +57,6 @@ func NewHandle(app string, ds datastore.DataStore, id string, numElements uint32 return h, nil } - // Register for status changes - h.watchForChanges() - // Get the initial status from the ds if present. if err := h.store.GetObject(datastore.Key(h.Key()...), h); err != nil && err != datastore.ErrKeyNotFound { return nil, err @@ -252,6 +249,12 @@ func (h *Handle) set(ordinal, start, end uint32, any bool, release bool) (uint32 ) for { + if h.store != nil { + if err := h.store.GetObject(datastore.Key(h.Key()...), h); err != nil && err != datastore.ErrKeyNotFound { + return ret, err + } + } + h.Lock() // Get position if available if release { diff --git a/libnetwork/bitseq/store.go b/libnetwork/bitseq/store.go index 8012a413d2..ef7fe33400 100644 --- a/libnetwork/bitseq/store.go +++ b/libnetwork/bitseq/store.go @@ -70,46 +70,47 @@ func (h *Handle) Exists() bool { return h.dbExists } +// New method returns a handle based on the receiver handle +func (h *Handle) New() datastore.KVObject { + h.Lock() + defer h.Unlock() + + return &Handle{ + app: h.app, + id: h.id, + store: h.store, + } +} + +// CopyTo deep copies the handle into the passed destination object +func (h *Handle) CopyTo(o datastore.KVObject) error { + h.Lock() + defer h.Unlock() + + dstH := o.(*Handle) + dstH.bits = h.bits + dstH.unselected = h.unselected + dstH.head = h.head.getCopy() + dstH.app = h.app + dstH.id = h.id + dstH.dbIndex = h.dbIndex + dstH.dbExists = h.dbExists + dstH.store = h.store + + return nil +} + // Skip provides a way for a KV Object to avoid persisting it in the KV Store func (h *Handle) Skip() bool { return false } // DataScope method returns the storage scope of the datastore -func (h *Handle) DataScope() datastore.DataScope { - return datastore.GlobalScope -} - -func (h *Handle) watchForChanges() error { +func (h *Handle) DataScope() string { h.Lock() - store := h.store - h.Unlock() + defer h.Unlock() - if store == nil { - return nil - } - - kvpChan, err := store.KVStore().Watch(datastore.Key(h.Key()...), nil) - if err != nil { - return err - } - go func() { - for { - select { - case kvPair := <-kvpChan: - // Only process remote update - if kvPair != nil && (kvPair.LastIndex != h.Index()) { - err := h.fromDsValue(kvPair.Value) - if err != nil { - log.Warnf("Failed to reconstruct bitseq handle from ds watch: %s", err.Error()) - } else { - h.SetIndex(kvPair.LastIndex) - } - } - } - } - }() - return nil + return h.store.Scope() } func (h *Handle) fromDsValue(value []byte) error { diff --git a/libnetwork/ipam/allocator.go b/libnetwork/ipam/allocator.go index 90dfb4a462..087c5ccf90 100644 --- a/libnetwork/ipam/allocator.go +++ b/libnetwork/ipam/allocator.go @@ -6,7 +6,6 @@ import ( "sync" log "github.com/Sirupsen/logrus" - "github.com/docker/libkv/store" "github.com/docker/libnetwork/bitseq" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/ipamapi" @@ -30,13 +29,10 @@ const ( type Allocator struct { // Predefined pools for default address spaces predefined map[string][]*net.IPNet - // Static subnet information - localSubnets *PoolsConfig - globalSubnets *PoolsConfig + addrSpaces map[string]*addrSpace + // stores []datastore.Datastore // Allocated addresses in each address space's subnet addresses map[SubnetKey]*bitseq.Handle - // Datastore - addrSpace2Configs map[string]*PoolsConfig sync.Mutex } @@ -44,73 +40,86 @@ type Allocator struct { func NewAllocator(lcDs, glDs datastore.DataStore) (*Allocator, error) { a := &Allocator{} - a.localSubnets = &PoolsConfig{ - subnets: map[SubnetKey]*PoolData{}, - id: dsConfigKey + "/Pools", - scope: datastore.LocalScope, - ds: lcDs, - alloc: a, - } - - a.globalSubnets = &PoolsConfig{ - subnets: map[SubnetKey]*PoolData{}, - id: dsConfigKey + "/Pools", - scope: datastore.GlobalScope, - ds: glDs, - alloc: a, - } - + // Load predefined subnet pools a.predefined = map[string][]*net.IPNet{ localAddressSpace: initLocalPredefinedPools(), globalAddressSpace: initGlobalPredefinedPools(), } - a.addrSpace2Configs = map[string]*PoolsConfig{ - localAddressSpace: a.localSubnets, - globalAddressSpace: a.globalSubnets, - } - + // Initialize bitseq map a.addresses = make(map[SubnetKey]*bitseq.Handle) - cfgs := []struct { - cfg *PoolsConfig - dsc string + // Initialize address spaces + a.addrSpaces = make(map[string]*addrSpace) + for _, aspc := range []struct { + as string + ds datastore.DataStore }{ - {a.localSubnets, "local"}, - {a.globalSubnets, "global"}, - } - // Get the initial local/global pools configfrom the datastores - var inserterList []func() error - for _, e := range cfgs { - if e.cfg.ds == nil { + {localAddressSpace, lcDs}, + {globalAddressSpace, glDs}, + } { + if aspc.ds == nil { continue } - if err := e.cfg.watchForChanges(); err != nil { - log.Warnf("Error on registering watch for %s datastore: %v", e.dsc, err) - } - if err := e.cfg.readFromStore(); err != nil && err != store.ErrKeyNotFound { - return nil, fmt.Errorf("failed to retrieve the ipam %s pools config from datastore: %v", e.dsc, err) - } - e.cfg.Lock() - for k, v := range e.cfg.subnets { - if v.Range == nil { - inserterList = append(inserterList, func() error { return a.insertBitMask(e.cfg.ds, k, v.Pool) }) - } - } - e.cfg.Unlock() - } - // Add the bitmasks (data could come from datastore) - if inserterList != nil { - for _, f := range inserterList { - if err := f(); err != nil { - return nil, err - } + + a.addrSpaces[aspc.as] = &addrSpace{ + subnets: map[SubnetKey]*PoolData{}, + id: dsConfigKey + "/" + aspc.as, + scope: aspc.ds.Scope(), + ds: aspc.ds, + alloc: a, } } return a, nil } +func (a *Allocator) refresh(as string) error { + aSpace, err := a.getAddressSpaceFromStore(as) + if err != nil { + return fmt.Errorf("error getting pools config from store during init: %v", + err) + } + + if aSpace == nil { + return nil + } + + if err := a.updateBitMasks(aSpace); err != nil { + return fmt.Errorf("error updating bit masks during init: %v", err) + } + + a.Lock() + a.addrSpaces[as] = aSpace + a.Unlock() + + return nil +} + +func (a *Allocator) updateBitMasks(aSpace *addrSpace) error { + var inserterList []func() error + + aSpace.Lock() + for k, v := range aSpace.subnets { + if v.Range == nil { + inserterList = append(inserterList, + func() error { return a.insertBitMask(k, v.Pool) }) + } + } + aSpace.Unlock() + + // Add the bitmasks (data could come from datastore) + if inserterList != nil { + for _, f := range inserterList { + if err := f(); err != nil { + return err + } + } + } + + return nil +} + // GetDefaultAddressSpaces returns the local and global default address spaces func (a *Allocator) GetDefaultAddressSpaces() (string, string, error) { return localAddressSpace, globalAddressSpace, nil @@ -123,25 +132,29 @@ func (a *Allocator) RequestPool(addressSpace, pool, subPool string, options map[ return "", nil, nil, ipamapi.ErrInvalidPool } - cfg, err := a.getPoolsConfig(addressSpace) +retry: + if err := a.refresh(addressSpace); err != nil { + return "", nil, nil, err + } + + aSpace, err := a.getAddrSpace(addressSpace) if err != nil { return "", nil, nil, err } -retry: - insert, err := cfg.updatePoolDBOnAdd(*k, nw, ipr) + insert, err := aSpace.updatePoolDBOnAdd(*k, nw, ipr) if err != nil { return "", nil, nil, err } - if err := cfg.writeToStore(); err != nil { + + if err := a.writeToStore(aSpace); err != nil { if _, ok := err.(types.RetryError); !ok { return "", nil, nil, types.InternalErrorf("pool configuration failed because of %s", err.Error()) } - if erru := cfg.readFromStore(); erru != nil { - return "", nil, nil, fmt.Errorf("failed to get updated pool config from datastore (%v) after (%v)", erru, err) - } + goto retry } + return k.String(), aw, nil, insert() } @@ -152,23 +165,25 @@ func (a *Allocator) ReleasePool(poolID string) error { return types.BadRequestErrorf("invalid pool id: %s", poolID) } - cfg, err := a.getPoolsConfig(k.AddressSpace) +retry: + if err := a.refresh(k.AddressSpace); err != nil { + return err + } + + aSpace, err := a.getAddrSpace(k.AddressSpace) if err != nil { return err } -retry: - remove, err := cfg.updatePoolDBOnRemoval(k) + remove, err := aSpace.updatePoolDBOnRemoval(k) if err != nil { return err } - if err = cfg.writeToStore(); err != nil { + + if err = a.writeToStore(aSpace); err != nil { if _, ok := err.(types.RetryError); !ok { return types.InternalErrorf("pool (%s) removal failed because of %v", poolID, err) } - if erru := cfg.readFromStore(); erru != nil { - return fmt.Errorf("failed to get updated pool config from datastore (%v) after (%v)", erru, err) - } goto retry } @@ -177,14 +192,14 @@ retry: // Given the address space, returns the local or global PoolConfig based on the // address space is local or global. AddressSpace locality is being registered with IPAM out of band. -func (a *Allocator) getPoolsConfig(addrSpace string) (*PoolsConfig, error) { +func (a *Allocator) getAddrSpace(as string) (*addrSpace, error) { a.Lock() defer a.Unlock() - cfg, ok := a.addrSpace2Configs[addrSpace] + aSpace, ok := a.addrSpaces[as] if !ok { - return nil, types.BadRequestErrorf("cannot find locality of address space: %s", addrSpace) + return nil, types.BadRequestErrorf("cannot find locality of address space: %s", as) } - return cfg, nil + return aSpace, nil } func (a *Allocator) parsePoolRequest(addressSpace, pool, subPool string, v6 bool) (*SubnetKey, *net.IPNet, *net.IPNet, *AddressRange, error) { @@ -224,8 +239,14 @@ func (a *Allocator) parsePoolRequest(addressSpace, pool, subPool string, v6 bool return &SubnetKey{AddressSpace: addressSpace, Subnet: nw.String(), ChildSubnet: subPool}, nw, aw, ipr, nil } -func (a *Allocator) insertBitMask(store datastore.DataStore, key SubnetKey, pool *net.IPNet) error { +func (a *Allocator) insertBitMask(key SubnetKey, pool *net.IPNet) error { log.Debugf("Inserting bitmask (%s, %s)", key.String(), pool.String()) + + store := a.getStore(key.AddressSpace) + if store == nil { + return fmt.Errorf("could not find store for address space %s while inserting bit mask", key.AddressSpace) + } + ipVer := getAddressVersion(pool.IP) ones, bits := pool.Mask.Size() numAddresses := uint32(1 << uint(bits-ones)) @@ -252,13 +273,13 @@ func (a *Allocator) insertBitMask(store datastore.DataStore, key SubnetKey, pool return nil } -func (a *Allocator) retrieveBitmask(ds datastore.DataStore, k SubnetKey, n *net.IPNet) (*bitseq.Handle, error) { +func (a *Allocator) retrieveBitmask(k SubnetKey, n *net.IPNet) (*bitseq.Handle, error) { a.Lock() bm, ok := a.addresses[k] a.Unlock() if !ok { log.Debugf("Retrieving bitmask (%s, %s)", k.String(), n.String()) - if err := a.insertBitMask(ds, k, n); err != nil { + if err := a.insertBitMask(k, n); err != nil { return nil, fmt.Errorf("could not find bitmask in datastore for %s", k.String()) } a.Lock() @@ -289,7 +310,7 @@ func (a *Allocator) getPredefinedPool(as string, ipV6 bool) (*net.IPNet, error) return nil, fmt.Errorf("no default pool availbale for non-default addresss spaces") } - cfg, err := a.getPoolsConfig(as) + aSpace, err := a.getAddrSpace(as) if err != nil { return nil, err } @@ -298,14 +319,14 @@ func (a *Allocator) getPredefinedPool(as string, ipV6 bool) (*net.IPNet, error) if v != getAddressVersion(nw.IP) { continue } - cfg.Lock() - _, ok := cfg.subnets[SubnetKey{AddressSpace: as, Subnet: nw.String()}] - cfg.Unlock() + aSpace.Lock() + _, ok := aSpace.subnets[SubnetKey{AddressSpace: as, Subnet: nw.String()}] + aSpace.Unlock() if ok { continue } - if !cfg.contains(as, nw) { + if !aSpace.contains(as, nw) { if as == localAddressSpace { if err := netutils.CheckRouteOverlaps(nw); err == nil { return nw, nil @@ -326,31 +347,35 @@ func (a *Allocator) RequestAddress(poolID string, prefAddress net.IP, opts map[s return nil, nil, types.BadRequestErrorf("invalid pool id: %s", poolID) } - cfg, err := a.getPoolsConfig(k.AddressSpace) + if err := a.refresh(k.AddressSpace); err != nil { + return nil, nil, err + } + + aSpace, err := a.getAddrSpace(k.AddressSpace) if err != nil { return nil, nil, err } - cfg.Lock() - p, ok := cfg.subnets[k] + aSpace.Lock() + p, ok := aSpace.subnets[k] if !ok { - cfg.Unlock() + aSpace.Unlock() return nil, nil, types.NotFoundErrorf("cannot find address pool for poolID:%s", poolID) } if prefAddress != nil && !p.Pool.Contains(prefAddress) { - cfg.Unlock() + aSpace.Unlock() return nil, nil, ipamapi.ErrIPOutOfRange } c := p for c.Range != nil { k = c.ParentKey - c, ok = cfg.subnets[k] + c, ok = aSpace.subnets[k] } - cfg.Unlock() + aSpace.Unlock() - bm, err := a.retrieveBitmask(cfg.ds, k, c.Pool) + bm, err := a.retrieveBitmask(k, c.Pool) if err != nil { return nil, nil, fmt.Errorf("could not find bitmask in datastore for %s on address %v request from pool %s: %v", k.String(), prefAddress, poolID, err) @@ -370,29 +395,33 @@ func (a *Allocator) ReleaseAddress(poolID string, address net.IP) error { return types.BadRequestErrorf("invalid pool id: %s", poolID) } - cfg, err := a.getPoolsConfig(k.AddressSpace) + if err := a.refresh(k.AddressSpace); err != nil { + return err + } + + aSpace, err := a.getAddrSpace(k.AddressSpace) if err != nil { return err } - cfg.Lock() - p, ok := cfg.subnets[k] + aSpace.Lock() + p, ok := aSpace.subnets[k] if !ok { - cfg.Unlock() + aSpace.Unlock() return ipamapi.ErrBadPool } if address == nil || !p.Pool.Contains(address) { - cfg.Unlock() + aSpace.Unlock() return ipamapi.ErrInvalidRequest } c := p for c.Range != nil { k = c.ParentKey - c = cfg.subnets[k] + c = aSpace.subnets[k] } - cfg.Unlock() + aSpace.Unlock() mask := p.Pool.Mask if p.Range != nil { @@ -403,7 +432,7 @@ func (a *Allocator) ReleaseAddress(poolID string, address net.IP) error { return fmt.Errorf("failed to release address %s: %v", address.String(), err) } - bm, err := cfg.alloc.retrieveBitmask(cfg.ds, k, c.Pool) + bm, err := a.retrieveBitmask(k, c.Pool) if err != nil { return fmt.Errorf("could not find bitmask in datastore for %s on address %v release from pool %s: %v", k.String(), address, poolID, err) @@ -449,23 +478,20 @@ func (a *Allocator) DumpDatabase() string { a.Lock() defer a.Unlock() - s := fmt.Sprintf("\n\nLocal Pool Config") - a.localSubnets.Lock() - for k, config := range a.localSubnets.subnets { - s = fmt.Sprintf("%s%s", s, fmt.Sprintf("\n%v: %v", k, config)) + var s string + for as, aSpace := range a.addrSpaces { + s = fmt.Sprintf("\n\n%s Config", as) + aSpace.Lock() + for k, config := range aSpace.subnets { + s = fmt.Sprintf("%s%s", s, fmt.Sprintf("\n%v: %v", k, config)) + } + aSpace.Unlock() } - a.localSubnets.Unlock() - - s = fmt.Sprintf("%s\n\nGlobal Pool Config", s) - a.globalSubnets.Lock() - for k, config := range a.globalSubnets.subnets { - s = fmt.Sprintf("%s%s", s, fmt.Sprintf("\n%v: %v", k, config)) - } - a.globalSubnets.Unlock() s = fmt.Sprintf("%s\n\nBitmasks", s) for k, bm := range a.addresses { s = fmt.Sprintf("%s%s", s, fmt.Sprintf("\n\t%s: %s\n\t%d", k, bm, bm.Unselected())) } + return s } diff --git a/libnetwork/ipam/allocator_test.go b/libnetwork/ipam/allocator_test.go index c6f5573df2..bf5132ee1d 100644 --- a/libnetwork/ipam/allocator_test.go +++ b/libnetwork/ipam/allocator_test.go @@ -11,7 +11,6 @@ import ( "github.com/docker/libkv/store" "github.com/docker/libnetwork/bitseq" - "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/ipamapi" "github.com/docker/libnetwork/netutils" @@ -32,9 +31,9 @@ func randomLocalStore() (datastore.DataStore, error) { if err := tmp.Close(); err != nil { return nil, fmt.Errorf("Error closing temp file: %v", err) } - return datastore.NewDataStore(&config.DatastoreCfg{ + return datastore.NewDataStore(datastore.LocalScope, &datastore.ScopeCfg{ Embedded: true, - Client: config.DatastoreClientCfg{ + Client: datastore.ScopeClientCfg{ Provider: "boltdb", Address: defaultPrefix + tmp.Name(), Config: &store.Config{ @@ -191,7 +190,11 @@ func TestSubnetsMarshal(t *testing.T) { t.Fatal(err) } - cfg := a.localSubnets + cfg, err := a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + ba := cfg.Value() if err := cfg.SetValue(ba); err != nil { t.Fatal(err) @@ -221,7 +224,7 @@ func TestAddSubnets(t *testing.T) { if err != nil { t.Fatal(err) } - a.addrSpace2Configs["abc"] = a.addrSpace2Configs[localAddressSpace] + a.addrSpaces["abc"] = a.addrSpaces[localAddressSpace] pid0, _, _, err := a.RequestPool(localAddressSpace, "10.0.0.0/8", "", nil, false) if err != nil { @@ -290,7 +293,13 @@ func TestAddReleasePoolID(t *testing.T) { if err != nil { t.Fatal(err) } - subnets := a.localSubnets.subnets + + aSpace, err := a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets := aSpace.subnets pid0, _, _, err := a.RequestPool(localAddressSpace, "10.0.0.0/8", "", nil, false) if err != nil { t.Fatalf("Unexpected failure in adding pool") @@ -298,6 +307,14 @@ func TestAddReleasePoolID(t *testing.T) { if err := k0.FromString(pid0); err != nil { t.Fatal(err) } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets + if subnets[k0].RefCount != 1 { t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount) } @@ -309,6 +326,13 @@ func TestAddReleasePoolID(t *testing.T) { if err := k1.FromString(pid1); err != nil { t.Fatal(err) } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets if subnets[k1].RefCount != 1 { t.Fatalf("Unexpected ref count for %s: %d", k1, subnets[k1].RefCount) } @@ -323,6 +347,13 @@ func TestAddReleasePoolID(t *testing.T) { if err := k2.FromString(pid2); err != nil { t.Fatal(err) } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets if subnets[k2].RefCount != 2 { t.Fatalf("Unexpected ref count for %s: %d", k2, subnets[k2].RefCount) } @@ -334,12 +365,26 @@ func TestAddReleasePoolID(t *testing.T) { if err := a.ReleasePool(pid1); err != nil { t.Fatal(err) } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets if subnets[k0].RefCount != 2 { t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount) } if err := a.ReleasePool(pid0); err != nil { t.Fatal(err) } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets if subnets[k0].RefCount != 1 { t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount) } @@ -351,6 +396,13 @@ func TestAddReleasePoolID(t *testing.T) { if pid00 != pid0 { t.Fatalf("main pool should still exist") } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets if subnets[k0].RefCount != 2 { t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount) } @@ -358,6 +410,13 @@ func TestAddReleasePoolID(t *testing.T) { if err := a.ReleasePool(pid2); err != nil { t.Fatal(err) } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets if subnets[k0].RefCount != 1 { t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount) } @@ -365,6 +424,13 @@ func TestAddReleasePoolID(t *testing.T) { if err := a.ReleasePool(pid00); err != nil { t.Fatal(err) } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets if bp, ok := subnets[k0]; ok { t.Fatalf("Base pool %s is still present: %v", k0, bp) } @@ -373,6 +439,13 @@ func TestAddReleasePoolID(t *testing.T) { if err != nil { t.Fatalf("Unexpected failure in adding pool") } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets if subnets[k0].RefCount != 1 { t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount) } @@ -417,18 +490,6 @@ func TestPredefinedPool(t *testing.T) { if nw != a.predefined[localAddressSpace][i] { t.Fatalf("Unexpected default network returned: %s", nw) } - - i, available, err = getFirstAvailablePool(a, globalAddressSpace, 2) - if err != nil { - t.Skip(err) - } - nw, err = a.getPredefinedPool(globalAddressSpace, false) - if err != nil { - t.Fatal(err) - } - if nw != available { - t.Fatalf("Unexpected default network returned: %s", nw) - } } func getFirstAvailablePool(a *Allocator, as string, atLeast int) (int, *net.IPNet, error) { @@ -475,7 +536,13 @@ func TestRemoveSubnet(t *testing.T) { if err != nil { t.Fatal(err) } - a.addrSpace2Configs["splane"] = a.addrSpace2Configs[localAddressSpace] + a.addrSpaces["splane"] = &addrSpace{ + id: dsConfigKey + "/" + "splane", + ds: a.addrSpaces[localAddressSpace].ds, + alloc: a.addrSpaces[localAddressSpace].alloc, + scope: a.addrSpaces[localAddressSpace].scope, + subnets: map[SubnetKey]*PoolData{}, + } input := []struct { addrSpace string @@ -512,7 +579,13 @@ func TestGetSameAddress(t *testing.T) { if err != nil { t.Fatal(err) } - a.addrSpace2Configs["giallo"] = a.addrSpace2Configs[localAddressSpace] + a.addrSpaces["giallo"] = &addrSpace{ + id: dsConfigKey + "/" + "giallo", + ds: a.addrSpaces[localAddressSpace].ds, + alloc: a.addrSpaces[localAddressSpace].alloc, + scope: a.addrSpaces[localAddressSpace].scope, + subnets: map[SubnetKey]*PoolData{}, + } pid, _, _, err := a.RequestPool("giallo", "192.168.100.0/24", "", nil, false) if err != nil { @@ -536,7 +609,13 @@ func TestRequestReleaseAddressFromSubPool(t *testing.T) { if err != nil { t.Fatal(err) } - a.addrSpace2Configs["rosso"] = a.addrSpace2Configs[localAddressSpace] + a.addrSpaces["rosso"] = &addrSpace{ + id: dsConfigKey + "/" + "rosso", + ds: a.addrSpaces[localAddressSpace].ds, + alloc: a.addrSpaces[localAddressSpace].alloc, + scope: a.addrSpaces[localAddressSpace].scope, + subnets: map[SubnetKey]*PoolData{}, + } poolID, _, _, err := a.RequestPool("rosso", "172.28.0.0/16", "172.28.30.0/24", nil, false) if err != nil { @@ -615,17 +694,23 @@ func TestGetAddress(t *testing.T) { func TestRequestSyntaxCheck(t *testing.T) { var ( - pool = "192.168.0.0/16" - subPool = "192.168.0.0/24" - addrSpace = "green" - err error + pool = "192.168.0.0/16" + subPool = "192.168.0.0/24" + as = "green" + err error ) a, err := getAllocator() if err != nil { t.Fatal(err) } - a.addrSpace2Configs[addrSpace] = a.addrSpace2Configs[localAddressSpace] + a.addrSpaces[as] = &addrSpace{ + id: dsConfigKey + "/" + as, + ds: a.addrSpaces[localAddressSpace].ds, + alloc: a.addrSpaces[localAddressSpace].alloc, + scope: a.addrSpaces[localAddressSpace].scope, + subnets: map[SubnetKey]*PoolData{}, + } _, _, _, err = a.RequestPool("", pool, "", nil, false) if err == nil { @@ -637,12 +722,12 @@ func TestRequestSyntaxCheck(t *testing.T) { t.Fatalf("Failed to detect wrong request: empty address space") } - _, _, _, err = a.RequestPool(addrSpace, "", subPool, nil, false) + _, _, _, err = a.RequestPool(as, "", subPool, nil, false) if err == nil { t.Fatalf("Failed to detect wrong request: subPool specified and no pool") } - pid, _, _, err := a.RequestPool(addrSpace, pool, subPool, nil, false) + pid, _, _, err := a.RequestPool(as, pool, subPool, nil, false) if err != nil { t.Fatalf("Unexpected failure: %v", err) } @@ -764,6 +849,7 @@ func TestRelease(t *testing.T) { for i, inp := range toRelease { ip0 := net.ParseIP(inp.address) a.ReleaseAddress(pid, ip0) + bm = a.addresses[SubnetKey{localAddressSpace, subnet, ""}] if bm.Unselected() != 1 { t.Fatalf("Failed to update free address count after release. Expected %d, Found: %d", i+1, bm.Unselected()) } diff --git a/libnetwork/ipam/store.go b/libnetwork/ipam/store.go index 7dc92d1570..f288fca05e 100644 --- a/libnetwork/ipam/store.go +++ b/libnetwork/ipam/store.go @@ -2,30 +2,30 @@ package ipam import ( "encoding/json" + "fmt" log "github.com/Sirupsen/logrus" - "github.com/docker/libkv/store" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/types" ) // Key provides the Key to be used in KV Store -func (cfg *PoolsConfig) Key() []string { - cfg.Lock() - defer cfg.Unlock() - return []string{cfg.id} +func (aSpace *addrSpace) Key() []string { + aSpace.Lock() + defer aSpace.Unlock() + return []string{aSpace.id} } // KeyPrefix returns the immediate parent key that can be used for tree walk -func (cfg *PoolsConfig) KeyPrefix() []string { - cfg.Lock() - defer cfg.Unlock() +func (aSpace *addrSpace) KeyPrefix() []string { + aSpace.Lock() + defer aSpace.Unlock() return []string{dsConfigKey} } // Value marshals the data to be stored in the KV store -func (cfg *PoolsConfig) Value() []byte { - b, err := json.Marshal(cfg) +func (aSpace *addrSpace) Value() []byte { + b, err := json.Marshal(aSpace) if err != nil { log.Warnf("Failed to marshal ipam configured pools: %v", err) return nil @@ -34,97 +34,94 @@ func (cfg *PoolsConfig) Value() []byte { } // SetValue unmarshalls the data from the KV store. -func (cfg *PoolsConfig) SetValue(value []byte) error { - rc := &PoolsConfig{subnets: make(map[SubnetKey]*PoolData)} +func (aSpace *addrSpace) SetValue(value []byte) error { + rc := &addrSpace{subnets: make(map[SubnetKey]*PoolData)} if err := json.Unmarshal(value, rc); err != nil { return err } - cfg.subnets = rc.subnets + aSpace.subnets = rc.subnets return nil } // Index returns the latest DB Index as seen by this object -func (cfg *PoolsConfig) Index() uint64 { - cfg.Lock() - defer cfg.Unlock() - return cfg.dbIndex +func (aSpace *addrSpace) Index() uint64 { + aSpace.Lock() + defer aSpace.Unlock() + return aSpace.dbIndex } // SetIndex method allows the datastore to store the latest DB Index into this object -func (cfg *PoolsConfig) SetIndex(index uint64) { - cfg.Lock() - cfg.dbIndex = index - cfg.dbExists = true - cfg.Unlock() +func (aSpace *addrSpace) SetIndex(index uint64) { + aSpace.Lock() + aSpace.dbIndex = index + aSpace.dbExists = true + aSpace.Unlock() } // Exists method is true if this object has been stored in the DB. -func (cfg *PoolsConfig) Exists() bool { - cfg.Lock() - defer cfg.Unlock() - return cfg.dbExists +func (aSpace *addrSpace) Exists() bool { + aSpace.Lock() + defer aSpace.Unlock() + return aSpace.dbExists } // Skip provides a way for a KV Object to avoid persisting it in the KV Store -func (cfg *PoolsConfig) Skip() bool { +func (aSpace *addrSpace) Skip() bool { return false } -func (cfg *PoolsConfig) watchForChanges() error { - if cfg.ds == nil { - return nil - } - kvpChan, err := cfg.ds.KVStore().Watch(datastore.Key(cfg.Key()...), nil) - if err != nil { - return err - } - go func() { - for { - select { - case kvPair := <-kvpChan: - if kvPair != nil { - cfg.readFromKey(kvPair) - } - } - } - }() - return nil +func (a *Allocator) getStore(as string) datastore.DataStore { + a.Lock() + defer a.Unlock() + + return a.addrSpaces[as].ds } -func (cfg *PoolsConfig) writeToStore() error { - if cfg.ds == nil { - return nil +func (a *Allocator) getAddressSpaceFromStore(as string) (*addrSpace, error) { + store := a.getStore(as) + if store == nil { + return nil, fmt.Errorf("store for address space %s not found", as) } - err := cfg.ds.PutObjectAtomic(cfg) + + pc := &addrSpace{id: dsConfigKey + "/" + as, ds: store, alloc: a} + if err := store.GetObject(datastore.Key(pc.Key()...), pc); err != nil { + if err == datastore.ErrKeyNotFound { + return nil, nil + } + + return nil, fmt.Errorf("could not get pools config from store: %v", err) + } + + return pc, nil +} + +func (a *Allocator) writeToStore(aSpace *addrSpace) error { + store := aSpace.store() + if store == nil { + return fmt.Errorf("invalid store while trying to write %s address space", aSpace.DataScope()) + } + + err := store.PutObjectAtomic(aSpace) if err == datastore.ErrKeyModified { return types.RetryErrorf("failed to perform atomic write (%v). retry might fix the error", err) } + return err } -func (cfg *PoolsConfig) readFromStore() error { - if cfg.ds == nil { - return nil +func (a *Allocator) deleteFromStore(aSpace *addrSpace) error { + store := aSpace.store() + if store == nil { + return fmt.Errorf("invalid store while trying to delete %s address space", aSpace.DataScope()) } - return cfg.ds.GetObject(datastore.Key(cfg.Key()...), cfg) -} -func (cfg *PoolsConfig) readFromKey(kvPair *store.KVPair) { - if cfg.dbIndex < kvPair.LastIndex { - cfg.SetValue(kvPair.Value) - cfg.dbIndex = kvPair.LastIndex - cfg.dbExists = true - } -} - -func (cfg *PoolsConfig) deleteFromStore() error { - if cfg.ds == nil { - return nil - } - return cfg.ds.DeleteObjectAtomic(cfg) + return store.DeleteObjectAtomic(aSpace) } // DataScope method returns the storage scope of the datastore -func (cfg *PoolsConfig) DataScope() datastore.DataScope { - return cfg.scope +func (aSpace *addrSpace) DataScope() string { + aSpace.Lock() + defer aSpace.Unlock() + + return aSpace.scope } diff --git a/libnetwork/ipam/structures.go b/libnetwork/ipam/structures.go index ddbc7a10e0..71a5690aa0 100644 --- a/libnetwork/ipam/structures.go +++ b/libnetwork/ipam/structures.go @@ -27,13 +27,13 @@ type PoolData struct { RefCount int } -// PoolsConfig contains the pool configurations -type PoolsConfig struct { +// addrSpace contains the pool configurations for the address space +type addrSpace struct { subnets map[SubnetKey]*PoolData dbIndex uint64 dbExists bool id string - scope datastore.DataScope + scope string ds datastore.DataStore alloc *Allocator sync.Mutex @@ -153,18 +153,18 @@ func (p *PoolData) UnmarshalJSON(data []byte) error { return nil } -// MarshalJSON returns the JSON encoding of the PoolsConfig object -func (cfg *PoolsConfig) MarshalJSON() ([]byte, error) { - cfg.Lock() - defer cfg.Unlock() +// MarshalJSON returns the JSON encoding of the addrSpace object +func (aSpace *addrSpace) MarshalJSON() ([]byte, error) { + aSpace.Lock() + defer aSpace.Unlock() m := map[string]interface{}{ - "Scope": string(cfg.scope), + "Scope": string(aSpace.scope), } - if cfg.subnets != nil { + if aSpace.subnets != nil { s := map[string]*PoolData{} - for k, v := range cfg.subnets { + for k, v := range aSpace.subnets { s[k.String()] = v } m["Subnets"] = s @@ -173,10 +173,10 @@ func (cfg *PoolsConfig) MarshalJSON() ([]byte, error) { return json.Marshal(m) } -// UnmarshalJSON decodes data into the PoolsConfig object -func (cfg *PoolsConfig) UnmarshalJSON(data []byte) error { - cfg.Lock() - defer cfg.Unlock() +// UnmarshalJSON decodes data into the addrSpace object +func (aSpace *addrSpace) UnmarshalJSON(data []byte) error { + aSpace.Lock() + defer aSpace.Unlock() m := map[string]interface{}{} err := json.Unmarshal(data, &m) @@ -184,10 +184,10 @@ func (cfg *PoolsConfig) UnmarshalJSON(data []byte) error { return err } - cfg.scope = datastore.LocalScope + aSpace.scope = datastore.LocalScope s := m["Scope"].(string) if s == string(datastore.GlobalScope) { - cfg.scope = datastore.GlobalScope + aSpace.scope = datastore.GlobalScope } if v, ok := m["Subnets"]; ok { @@ -200,31 +200,81 @@ func (cfg *PoolsConfig) UnmarshalJSON(data []byte) error { for ks, v := range s { k := SubnetKey{} k.FromString(ks) - cfg.subnets[k] = v + aSpace.subnets[k] = v } } return nil } -func (cfg *PoolsConfig) updatePoolDBOnAdd(k SubnetKey, nw *net.IPNet, ipr *AddressRange) (func() error, error) { - cfg.Lock() - defer cfg.Unlock() +// CopyTo deep copies the pool data to the destination pooldata +func (p *PoolData) CopyTo(dstP *PoolData) error { + dstP.ParentKey = p.ParentKey + dstP.Pool = types.GetIPNetCopy(p.Pool) + + if p.Range != nil { + dstP.Range = &AddressRange{} + dstP.Range.Sub = types.GetIPNetCopy(p.Range.Sub) + dstP.Range.Start = p.Range.Start + dstP.Range.End = p.Range.End + } + + dstP.RefCount = p.RefCount + return nil +} + +func (aSpace *addrSpace) CopyTo(o datastore.KVObject) error { + aSpace.Lock() + defer aSpace.Unlock() + + dstAspace := o.(*addrSpace) + + dstAspace.id = aSpace.id + dstAspace.ds = aSpace.ds + dstAspace.alloc = aSpace.alloc + dstAspace.scope = aSpace.scope + dstAspace.dbIndex = aSpace.dbIndex + dstAspace.dbExists = aSpace.dbExists + + dstAspace.subnets = make(map[SubnetKey]*PoolData) + for k, v := range aSpace.subnets { + dstAspace.subnets[k] = &PoolData{} + v.CopyTo(dstAspace.subnets[k]) + } + + return nil +} + +func (aSpace *addrSpace) New() datastore.KVObject { + aSpace.Lock() + defer aSpace.Unlock() + + return &addrSpace{ + id: aSpace.id, + ds: aSpace.ds, + alloc: aSpace.alloc, + scope: aSpace.scope, + } +} + +func (aSpace *addrSpace) updatePoolDBOnAdd(k SubnetKey, nw *net.IPNet, ipr *AddressRange) (func() error, error) { + aSpace.Lock() + defer aSpace.Unlock() // Check if already allocated - if p, ok := cfg.subnets[k]; ok { - cfg.incRefCount(p, 1) + if p, ok := aSpace.subnets[k]; ok { + aSpace.incRefCount(p, 1) return func() error { return nil }, nil } // If master pool, check for overlap if ipr == nil { - if cfg.contains(k.AddressSpace, nw) { + if aSpace.contains(k.AddressSpace, nw) { return nil, ipamapi.ErrPoolOverlap } // This is a new master pool, add it along with corresponding bitmask - cfg.subnets[k] = &PoolData{Pool: nw, RefCount: 1} - return func() error { return cfg.alloc.insertBitMask(cfg.ds, k, nw) }, nil + aSpace.subnets[k] = &PoolData{Pool: nw, RefCount: 1} + return func() error { return aSpace.alloc.insertBitMask(k, nw) }, nil } // This is a new non-master pool @@ -234,38 +284,38 @@ func (cfg *PoolsConfig) updatePoolDBOnAdd(k SubnetKey, nw *net.IPNet, ipr *Addre Range: ipr, RefCount: 1, } - cfg.subnets[k] = p + aSpace.subnets[k] = p // Look for parent pool - pp, ok := cfg.subnets[p.ParentKey] + pp, ok := aSpace.subnets[p.ParentKey] if ok { - cfg.incRefCount(pp, 1) + aSpace.incRefCount(pp, 1) return func() error { return nil }, nil } // Parent pool does not exist, add it along with corresponding bitmask - cfg.subnets[p.ParentKey] = &PoolData{Pool: nw, RefCount: 1} - return func() error { return cfg.alloc.insertBitMask(cfg.ds, p.ParentKey, nw) }, nil + aSpace.subnets[p.ParentKey] = &PoolData{Pool: nw, RefCount: 1} + return func() error { return aSpace.alloc.insertBitMask(p.ParentKey, nw) }, nil } -func (cfg *PoolsConfig) updatePoolDBOnRemoval(k SubnetKey) (func() error, error) { - cfg.Lock() - defer cfg.Unlock() +func (aSpace *addrSpace) updatePoolDBOnRemoval(k SubnetKey) (func() error, error) { + aSpace.Lock() + defer aSpace.Unlock() - p, ok := cfg.subnets[k] + p, ok := aSpace.subnets[k] if !ok { return nil, ipamapi.ErrBadPool } - cfg.incRefCount(p, -1) + aSpace.incRefCount(p, -1) c := p for ok { if c.RefCount == 0 { - delete(cfg.subnets, k) + delete(aSpace.subnets, k) if c.Range == nil { return func() error { - bm, err := cfg.alloc.retrieveBitmask(cfg.ds, k, c.Pool) + bm, err := aSpace.alloc.retrieveBitmask(k, c.Pool) if err != nil { return fmt.Errorf("could not find bitmask in datastore for pool %s removal: %v", k.String(), err) } @@ -274,24 +324,24 @@ func (cfg *PoolsConfig) updatePoolDBOnRemoval(k SubnetKey) (func() error, error) } } k = c.ParentKey - c, ok = cfg.subnets[k] + c, ok = aSpace.subnets[k] } return func() error { return nil }, nil } -func (cfg *PoolsConfig) incRefCount(p *PoolData, delta int) { +func (aSpace *addrSpace) incRefCount(p *PoolData, delta int) { c := p ok := true for ok { c.RefCount += delta - c, ok = cfg.subnets[c.ParentKey] + c, ok = aSpace.subnets[c.ParentKey] } } // Checks whether the passed subnet is a superset or subset of any of the subset in this config db -func (cfg *PoolsConfig) contains(space string, nw *net.IPNet) bool { - for k, v := range cfg.subnets { +func (aSpace *addrSpace) contains(space string, nw *net.IPNet) bool { + for k, v := range aSpace.subnets { if space == k.AddressSpace && k.ChildSubnet == "" { if nw.Contains(v.Pool.IP) || v.Pool.Contains(nw.IP) { return true @@ -300,3 +350,10 @@ func (cfg *PoolsConfig) contains(space string, nw *net.IPNet) bool { } return false } + +func (aSpace *addrSpace) store() datastore.DataStore { + aSpace.Lock() + defer aSpace.Unlock() + + return aSpace.ds +} From a22ce0938ccb601e2f98b933a9b57ab6a2cb230b Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Mon, 5 Oct 2015 04:32:40 -0700 Subject: [PATCH 4/4] Add bridge network integration tests Add a few bridge network integration tests which specifically deals with multiple bridge networks and libnetwork restart and persistence Signed-off-by: Jana Radhakrishnan --- libnetwork/test/integration/dnet/bridge.bats | 157 ++++++++++++++++++ libnetwork/test/integration/dnet/helpers.bash | 41 ++++- libnetwork/test/integration/dnet/multi.bats | 33 ++-- libnetwork/test/integration/dnet/overlay.bats | 15 +- .../integration/dnet/run-integration-tests.sh | 12 ++ libnetwork/test/integration/dnet/simple.bats | 4 +- 6 files changed, 231 insertions(+), 31 deletions(-) create mode 100644 libnetwork/test/integration/dnet/bridge.bats diff --git a/libnetwork/test/integration/dnet/bridge.bats b/libnetwork/test/integration/dnet/bridge.bats new file mode 100644 index 0000000000..12c265f0ae --- /dev/null +++ b/libnetwork/test/integration/dnet/bridge.bats @@ -0,0 +1,157 @@ +# -*- mode: sh -*- +#!/usr/bin/env bats + +load helpers + +function test_single_network_connectivity() { + local nw_name start end + + nw_name=${1} + start=1 + end=${2} + + # Create containers and connect them to the network + for i in `seq ${start} ${end}`; + do + dnet_cmd $(inst_id2port 1) container create container_${i} + net_connect 1 container_${i} ${nw_name} + done + + # Now test connectivity between all the containers using service names + for i in `seq ${start} ${end}`; + do + for j in `seq ${start} ${end}`; + do + if [ "$i" -eq "$j" ]; then + continue + fi + runc $(dnet_container_name 1 bridge) $(get_sbox_id 1 container_${i}) \ + "ping -c 1 container_${j}" + done + done + + # Teardown the container connections and the network + for i in `seq ${start} ${end}`; + do + net_disconnect 1 container_${i} ${nw_name} + dnet_cmd $(inst_id2port 1) container rm container_${i} + done +} + +@test "Test default bridge network" { + skip_for_circleci + + echo $(docker ps) + test_single_network_connectivity bridge 3 +} + +@test "Test bridge network" { + skip_for_circleci + + echo $(docker ps) + dnet_cmd $(inst_id2port 1) network create -d bridge singlehost + test_single_network_connectivity singlehost 3 + dnet_cmd $(inst_id2port 1) network rm singlehost +} + +@test "Test bridge network dnet restart" { + skip_for_circleci + + echo $(docker ps) + dnet_cmd $(inst_id2port 1) network create -d bridge singlehost + + for iter in `seq 1 2`; + do + test_single_network_connectivity singlehost 3 + docker restart dnet-1-bridge + sleep 2 + done + + dnet_cmd $(inst_id2port 1) network rm singlehost +} + +@test "Test multiple bridge networks" { + skip_for_circleci + + echo $(docker ps) + + start=1 + end=3 + + for i in `seq ${start} ${end}`; + do + dnet_cmd $(inst_id2port 1) container create container_${i} + for j in `seq ${start} ${end}`; + do + if [ "$i" -eq "$j" ]; then + continue + fi + + if [ "$i" -lt "$j" ]; then + dnet_cmd $(inst_id2port 1) network create -d bridge sh${i}${j} + nw=sh${i}${j} + else + nw=sh${j}${i} + fi + + osvc="svc${i}${j}" + dnet_cmd $(inst_id2port 1) service publish ${osvc}.${nw} + dnet_cmd $(inst_id2port 1) service attach container_${i} ${osvc}.${nw} + done + done + + for i in `seq ${start} ${end}`; + do + echo ${i1} + for j in `seq ${start} ${end}`; + do + echo ${j1} + if [ "$i" -eq "$j" ]; then + continue + fi + + osvc="svc${j}${i}" + echo "pinging ${osvc}" + dnet_cmd $(inst_id2port 1) service ls + runc $(dnet_container_name 1 bridge) $(get_sbox_id 1 container_${i}) "cat /etc/hosts" + runc $(dnet_container_name 1 bridge) $(get_sbox_id 1 container_${i}) "ping -c 1 ${osvc}" + done + done + + for i in `seq ${start} ${end}`; + do + for j in `seq ${start} ${end}`; + do + if [ "$i" -eq "$j" ]; then + continue + fi + + if [ "$i" -lt "$j" ]; then + nw=sh${i}${j} + else + nw=sh${j}${i} + fi + + osvc="svc${i}${j}" + dnet_cmd $(inst_id2port 1) service detach container_${i} ${osvc}.${nw} + dnet_cmd $(inst_id2port 1) service unpublish ${osvc}.${nw} + + done + dnet_cmd $(inst_id2port 1) container rm container_${i} + done + + for i in `seq ${start} ${end}`; + do + for j in `seq ${start} ${end}`; + do + if [ "$i" -eq "$j" ]; then + continue + fi + + if [ "$i" -lt "$j" ]; then + dnet_cmd $(inst_id2port 1) network rm sh${i}${j} + fi + done + done + +} diff --git a/libnetwork/test/integration/dnet/helpers.bash b/libnetwork/test/integration/dnet/helpers.bash index 935ae28092..5a6a6e3237 100644 --- a/libnetwork/test/integration/dnet/helpers.bash +++ b/libnetwork/test/integration/dnet/helpers.bash @@ -6,6 +6,23 @@ function dnet_container_name() { echo dnet-$1-$2 } +function get_sbox_id() { + local line + + line=$(dnet_cmd $(inst_id2port ${1}) service ls | grep ${2}) + echo ${line} | cut -d" " -f5 +} + +function net_connect() { + dnet_cmd $(inst_id2port ${1}) service publish ${2}.${3} + dnet_cmd $(inst_id2port ${1}) service attach ${2} ${2}.${3} +} + +function net_disconnect() { + dnet_cmd $(inst_id2port ${1}) service detach ${2} ${2}.${3} + dnet_cmd $(inst_id2port ${1}) service unpublish ${2}.${3} +} + function start_consul() { stop_consul docker run -d \ @@ -28,6 +45,7 @@ function stop_consul() { } function start_dnet() { + local inst suffix name hport cport hopt neighip bridge_ip labels tomlfile inst=$1 shift suffix=$1 @@ -39,7 +57,6 @@ function start_dnet() { hport=$((41000+${inst}-1)) cport=2385 hopt="" - isnum='^[0-9]+$' while [ -n "$1" ] do @@ -62,10 +79,12 @@ function start_dnet() { labels="\"com.docker.network.driver.overlay.bind_interface=eth0\", \"com.docker.network.driver.overlay.neighbor_ip=${neighip}\"" fi + echo "parsed values: " ${name} ${hport} ${cport} ${hopt} ${neighip} ${labels} + mkdir -p /tmp/dnet/${name} tomlfile="/tmp/dnet/${name}/libnetwork.toml" cat > ${tomlfile} <>${INTEGRATION_ROOT}/test.log 2>&1 unset cmap[dnet-3-multi] +## Setup +start_dnet 1 bridge 1>>${INTEGRATION_ROOT}/test.log 2>&1 +cmap[dnet-1-bridge]=dnet-1-bridge + +## Run the test cases +./integration-tmp/bin/bats ./test/integration/dnet/bridge.bats +#docker logs dnet-1-bridge + +## Teardown +stop_dnet 1 bridge 1>>${INTEGRATION_ROOT}/test.log 2>&1 +unset cmap[dnet-1-bridge] + ## Setup start_dnet 1 overlay 1>>${INTEGRATION_ROOT}/test.log 2>&1 cmap[dnet-1-overlay]=dnet-1-overlay diff --git a/libnetwork/test/integration/dnet/simple.bats b/libnetwork/test/integration/dnet/simple.bats index 5b96797472..96365eaae3 100644 --- a/libnetwork/test/integration/dnet/simple.bats +++ b/libnetwork/test/integration/dnet/simple.bats @@ -7,6 +7,8 @@ load helpers run dnet_cmd $(inst_id2port 1) network create -d test mh1 echo ${output} [ "$status" -eq 0 ] + run dnet_cmd $(inst_id2port 1) network ls + echo ${output} line=$(dnet_cmd $(inst_id2port 1) network ls | grep mh1) echo ${line} name=$(echo ${line} | cut -d" " -f2) @@ -32,9 +34,9 @@ load helpers echo ${output} [ "$status" -eq 0 ] run dnet_cmd $(inst_id2port 1) service ls - [ "$status" -eq 0 ] echo ${output} echo ${lines[1]} + [ "$status" -eq 0 ] svc=$(echo ${lines[1]} | cut -d" " -f2) network=$(echo ${lines[1]} | cut -d" " -f3) echo ${svc} ${network}