diff --git a/libnetwork/controller.go b/libnetwork/controller.go index 442473eb20..62a6266a58 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -48,8 +48,10 @@ package libnetwork import ( "sync" + log "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/plugins" "github.com/docker/docker/pkg/stringid" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/sandbox" "github.com/docker/libnetwork/types" @@ -95,6 +97,7 @@ type controller struct { networks networkTable drivers driverTable sandboxes sandboxTable + store datastore.DataStore sync.Mutex } @@ -107,6 +110,18 @@ func New() (NetworkController, error) { if err := initDrivers(c); err != nil { return nil, err } + + /* TODO : Duh ! make this configurable :-) */ + config := &datastore.StoreConfiguration{} + config.Provider = "consul" + config.Addrs = []string{"localhost:8500"} + + store, err := datastore.NewDataStore(config) + if err != nil { + log.Error("Failed to connect with Consul server") + } + c.store = store + return c, nil } @@ -176,6 +191,7 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti // Store the network handler in controller c.Lock() c.networks[network.id] = network + c.store.PutObjectAtomic(network) c.Unlock() return network, nil diff --git a/libnetwork/datastore/datastore.go b/libnetwork/datastore/datastore.go new file mode 100644 index 0000000000..0b1d954116 --- /dev/null +++ b/libnetwork/datastore/datastore.go @@ -0,0 +1,111 @@ +package datastore + +import ( + "errors" + "strings" + + "github.com/docker/swarm/pkg/store" +) + +//DataStore exported +type DataStore interface { + // PutObject adds a new Record based on an object into the datastore + PutObject(kvObject KV) error + // PutObjectAtomic provides an atomic add and update operation for a Record + PutObjectAtomic(kvObject KV) error + // KVStore returns access to the KV Store + KVStore() store.Store +} + +type datastore struct { + store store.Store + config *StoreConfiguration +} + +//StoreConfiguration exported +type StoreConfiguration struct { + Addrs []string + Provider string +} + +//KV Key Value interface used by objects to be part of the DataStore +type KV interface { + Key() []string + Value() []byte + Index() uint64 + SetIndex(uint64) +} + +//Key provides convenient method to create a Key +func Key(key ...string) string { + keychain := []string{"docker", "libnetwork"} + keychain = append(keychain, key...) + str := strings.Join(keychain, "/") + return str + "/" +} + +var errNewDatastore = errors.New("Error creating new Datastore") +var errInvalidConfiguration = errors.New("Invalid Configuration passed to Datastore") +var errInvalidAtomicRequest = errors.New("Invalid Atomic Request") + +// newClient used to connect to KV Store +func newClient(kv string, addrs []string) (DataStore, error) { + store, err := store.CreateStore(kv, addrs, store.Config{}) + if err != nil { + return nil, err + } + ds := &datastore{store: store} + return ds, nil +} + +// NewDataStore creates a new instance of LibKV data store +func NewDataStore(config *StoreConfiguration) (DataStore, error) { + if config == nil { + return nil, errInvalidConfiguration + } + return newClient(config.Provider, config.Addrs) +} + +func (ds *datastore) KVStore() store.Store { + return ds.store +} + +// PutObjectAtomic adds a new Record based on an object into the datastore +func (ds *datastore) PutObjectAtomic(kvObject KV) error { + if kvObject == nil { + return errors.New("kvObject is nil") + } + kvObjValue := kvObject.Value() + + if kvObjValue == nil { + return errInvalidAtomicRequest + } + _, err := ds.store.AtomicPut(Key(kvObject.Key()...), []byte{}, kvObjValue, kvObject.Index()) + if err != nil { + return err + } + + _, index, err := ds.store.Get(Key(kvObject.Key()...)) + if err != nil { + return err + } + kvObject.SetIndex(index) + return nil +} + +// PutObject adds a new Record based on an object into the datastore +func (ds *datastore) PutObject(kvObject KV) error { + if kvObject == nil { + return errors.New("kvObject is nil") + } + return ds.putObjectWithKey(kvObject, kvObject.Key()...) +} + +func (ds *datastore) putObjectWithKey(kvObject KV, key ...string) error { + kvObjValue := kvObject.Value() + + if kvObjValue == nil { + return errors.New("Object must provide marshalled data for key : " + Key(kvObject.Key()...)) + } + return ds.store.Put(Key(key...), kvObjValue) +} diff --git a/libnetwork/datastore/datastore_test.go b/libnetwork/datastore/datastore_test.go new file mode 100644 index 0000000000..4ec0dce3ea --- /dev/null +++ b/libnetwork/datastore/datastore_test.go @@ -0,0 +1,177 @@ +package datastore + +import ( + "encoding/json" + "testing" + + _ "github.com/docker/libnetwork/netutils" + "github.com/docker/libnetwork/options" +) + +var dummyKey = "dummy" + +func TestInvalidDataStore(t *testing.T) { + config := &StoreConfiguration{} + config.Provider = "invalid" + config.Addrs = []string{"localhost:8500"} + _, err := NewDataStore(config) + if err == nil { + t.Fatal("Invalid Datastore connection configuration must result in a failure") + } +} + +func TestKVObjectFlatKey(t *testing.T) { + mockStore := newMockStore() + store := datastore{store: mockStore} + expected := dummyKVObject("1000", true) + err := store.PutObject(expected) + if err != nil { + t.Fatal(err) + } + keychain := []string{dummyKey, "1000"} + data, _, err := store.KVStore().Get(Key(keychain...)) + if err != nil { + t.Fatal(err) + } + var n dummyObject + json.Unmarshal(data, &n) + if n.Name != expected.Name { + t.Fatalf("Dummy object doesnt match the expected object") + } +} + +func TestAtomicKVObjectFlatKey(t *testing.T) { + mockStore := newMockStore() + store := datastore{store: mockStore} + expected := dummyKVObject("1111", true) + err := store.PutObjectAtomic(expected) + if err != nil { + t.Fatal(err) + } + + // PutObjectAtomic automatically sets the Index again. Hence the following must pass. + + err = store.PutObjectAtomic(expected) + if err != nil { + t.Fatal("Atomic update with an older Index must fail") + } + + // Get the latest index and try PutObjectAtomic again for the same Key + // This must succeed as well + data, index, err := store.KVStore().Get(Key(expected.Key()...)) + if err != nil { + t.Fatal(err) + } + n := dummyObject{} + json.Unmarshal(data, &n) + n.ID = "1111" + n.DBIndex = index + n.ReturnValue = true + err = store.PutObjectAtomic(&n) + if err != nil { + t.Fatal(err) + } +} + +// dummy data used to test the datastore +type dummyObject struct { + Name string `kv:"leaf"` + NetworkType string `kv:"leaf"` + EnableIPv6 bool `kv:"leaf"` + Rec *recStruct `kv:"recursive"` + Dict map[string]*recStruct `kv:"iterative"` + Generic options.Generic `kv:"iterative"` + ID string + DBIndex uint64 + ReturnValue bool +} + +func (n *dummyObject) Key() []string { + return []string{dummyKey, n.ID} +} +func (n *dummyObject) Value() []byte { + if !n.ReturnValue { + return nil + } + + b, err := json.Marshal(n) + if err != nil { + return nil + } + return b +} + +func (n *dummyObject) Index() uint64 { + return n.DBIndex +} + +func (n *dummyObject) SetIndex(index uint64) { + n.DBIndex = index +} + +func (n *dummyObject) MarshalJSON() ([]byte, error) { + netMap := make(map[string]interface{}) + netMap["name"] = n.Name + netMap["networkType"] = n.NetworkType + netMap["enableIPv6"] = n.EnableIPv6 + netMap["generic"] = n.Generic + return json.Marshal(netMap) +} + +func (n *dummyObject) UnmarshalJSON(b []byte) (err error) { + var netMap map[string]interface{} + if err := json.Unmarshal(b, &netMap); err != nil { + return err + } + n.Name = netMap["name"].(string) + n.NetworkType = netMap["networkType"].(string) + n.EnableIPv6 = netMap["enableIPv6"].(bool) + n.Generic = netMap["generic"].(map[string]interface{}) + return nil +} + +// dummy structure to test "recursive" cases +type recStruct struct { + Name string `kv:"leaf"` + Field1 int `kv:"leaf"` + Dict map[string]string `kv:"iterative"` + DBIndex uint64 +} + +func (r *recStruct) Key() []string { + return []string{"recStruct"} +} +func (r *recStruct) Value() []byte { + b, err := json.Marshal(r) + if err != nil { + return nil + } + return b +} + +func (r *recStruct) Index() uint64 { + return r.DBIndex +} + +func (r *recStruct) SetIndex(index uint64) { + r.DBIndex = index +} + +func dummyKVObject(id string, retValue bool) *dummyObject { + cDict := make(map[string]string) + cDict["foo"] = "bar" + cDict["hello"] = "world" + n := dummyObject{ + Name: "testNw", + NetworkType: "bridge", + EnableIPv6: true, + Rec: &recStruct{"gen", 5, cDict, 0}, + ID: id, + DBIndex: 0, + ReturnValue: retValue} + generic := make(map[string]interface{}) + generic["label1"] = &recStruct{"value1", 1, cDict, 0} + generic["label2"] = "subnet=10.1.1.0/16" + n.Generic = generic + return &n +} diff --git a/libnetwork/datastore/mock_store.go b/libnetwork/datastore/mock_store.go new file mode 100644 index 0000000000..ec95f151ff --- /dev/null +++ b/libnetwork/datastore/mock_store.go @@ -0,0 +1,131 @@ +package datastore + +import ( + "errors" + "time" + + "github.com/docker/swarm/pkg/store" +) + +var ( + // ErrNotImplmented exported + ErrNotImplmented = errors.New("Functionality not implemented") +) + +// MockData exported +type MockData struct { + Data []byte + Index uint64 +} + +// MockStore exported +type MockStore struct { + db map[string]*MockData +} + +// NewMockStore creates a Map backed Datastore that is useful for mocking +func NewMockStore() *MockStore { + db := make(map[string]*MockData) + return &MockStore{db} +} + +// Get the value at "key", returns the last modified index +// to use in conjunction to CAS calls +func (s *MockStore) Get(key string) (value []byte, lastIndex uint64, err error) { + mData := s.db[key] + if mData == nil { + return nil, 0, nil + } + return mData.Data, mData.Index, nil + +} + +// Put a value at "key" +func (s *MockStore) Put(key string, value []byte) error { + mData := s.db[key] + if mData == nil { + mData = &MockData{value, 0} + } + mData.Index = mData.Index + 1 + s.db[key] = mData + return nil +} + +// Delete a value at "key" +func (s *MockStore) Delete(key string) error { + delete(s.db, key) + return nil +} + +// Exists checks that the key exists inside the store +func (s *MockStore) Exists(key string) (bool, error) { + _, ok := s.db[key] + return ok, nil +} + +// GetRange gets a range of values at "directory" +func (s *MockStore) GetRange(prefix string) (values []store.KVEntry, err error) { + return nil, ErrNotImplmented +} + +// DeleteRange deletes a range of values at "directory" +func (s *MockStore) DeleteRange(prefix string) error { + return ErrNotImplmented +} + +// Watch a single key for modifications +func (s *MockStore) Watch(key string, heartbeat time.Duration, callback store.WatchCallback) error { + return ErrNotImplmented +} + +// CancelWatch cancels a watch, sends a signal to the appropriate +// stop channel +func (s *MockStore) CancelWatch(key string) error { + return ErrNotImplmented +} + +// Internal function to check if a key has changed +func (s *MockStore) waitForChange(key string) <-chan uint64 { + return nil +} + +// WatchRange triggers a watch on a range of values at "directory" +func (s *MockStore) WatchRange(prefix string, filter string, heartbeat time.Duration, callback store.WatchCallback) error { + return ErrNotImplmented +} + +// CancelWatchRange stops the watch on the range of values, sends +// a signal to the appropriate stop channel +func (s *MockStore) CancelWatchRange(prefix string) error { + return ErrNotImplmented +} + +// Acquire the lock for "key"/"directory" +func (s *MockStore) Acquire(key string, value []byte) (string, error) { + return "", ErrNotImplmented +} + +// Release the lock for "key"/"directory" +func (s *MockStore) Release(id string) error { + return ErrNotImplmented +} + +// AtomicPut put a value at "key" if the key has not been +// modified in the meantime, throws an error if this is the case +func (s *MockStore) AtomicPut(key string, _ []byte, newValue []byte, index uint64) (bool, error) { + mData := s.db[key] + if mData != nil && mData.Index != index { + return false, errInvalidAtomicRequest + } + return true, s.Put(key, newValue) +} + +// AtomicDelete deletes a value at "key" if the key has not +// been modified in the meantime, throws an error if this is the case +func (s *MockStore) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) { + mData := s.db[key] + if mData != nil && mData.Index != index { + return false, errInvalidAtomicRequest + } + return true, s.Delete(key) +} diff --git a/libnetwork/network.go b/libnetwork/network.go index 36938a5458..ccee297dad 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -1,6 +1,7 @@ package libnetwork import ( + "encoding/json" "sync" "github.com/docker/docker/pkg/stringid" @@ -56,6 +57,7 @@ type network struct { enableIPv6 bool endpoints endpointTable generic options.Generic + dbIndex uint64 sync.Mutex } @@ -75,6 +77,51 @@ func (n *network) Type() string { return n.driver.Type() } +func (n *network) Key() []string { + return []string{"network", string(n.id)} +} + +func (n *network) Value() []byte { + b, err := json.Marshal(n) + if err != nil { + return nil + } + return b +} + +func (n *network) Index() uint64 { + return n.dbIndex +} + +func (n *network) SetIndex(index uint64) { + n.dbIndex = index +} + +// TODO : Can be made much more generic with the help of reflection (but has some golang limitations) +func (n *network) MarshalJSON() ([]byte, error) { + netMap := make(map[string]interface{}) + netMap["name"] = n.name + netMap["id"] = string(n.id) + netMap["networkType"] = n.networkType + netMap["enableIPv6"] = n.enableIPv6 + netMap["generic"] = n.generic + return json.Marshal(netMap) +} + +// TODO : Can be made much more generic with the help of reflection (but has some golang limitations) +func (n *network) UnmarshalJSON(b []byte) (err error) { + var netMap map[string]interface{} + if err := json.Unmarshal(b, &netMap); err != nil { + return err + } + n.name = netMap["name"].(string) + n.id = netMap["id"].(types.UUID) + n.networkType = netMap["networkType"].(string) + n.enableIPv6 = netMap["enableIPv6"].(bool) + n.generic = netMap["generic"].(map[string]interface{}) + return nil +} + // NetworkOption is a option setter function type used to pass varios options to // NewNetwork method. The various setter functions of type NetworkOption are // provided by libnetwork, they look like NetworkOptionXXXX(...)