From 0fda541b378501e300ef36149dbe7b9eaf4cb182 Mon Sep 17 00:00:00 2001 From: Madhu Venugopal Date: Fri, 29 May 2015 20:42:23 -0700 Subject: [PATCH] Updating to new Swarm discovery and store APIs Signed-off-by: Madhu Venugopal --- libnetwork/cmd/test/libnetwork.toml | 4 +- libnetwork/cmd/test/main.go | 11 ++- libnetwork/controller.go | 88 +++++++++++-------- libnetwork/datastore/datastore.go | 14 ++- libnetwork/datastore/datastore_test.go | 10 +-- libnetwork/datastore/mock_store.go | 71 ++++++--------- libnetwork/hostdiscovery/hostdiscovery.go | 53 +++++------ .../hostdiscovery/hostdiscovery_test.go | 4 +- 8 files changed, 129 insertions(+), 126 deletions(-) diff --git a/libnetwork/cmd/test/libnetwork.toml b/libnetwork/cmd/test/libnetwork.toml index eac50c3665..e3bfc82099 100644 --- a/libnetwork/cmd/test/libnetwork.toml +++ b/libnetwork/cmd/test/libnetwork.toml @@ -3,8 +3,8 @@ title = "LibNetwork Configuration file" [daemon] debug = false [cluster] - discovery = "token://ce5b9756aeab50fe8fda02624f093d1c" - Address = "1.1.1.1:90" + discovery = "token://22aa23948f4f6b31230687689636959e" + Address = "2.1.1.1" [datastore] embedded = false [datastore.client] diff --git a/libnetwork/cmd/test/main.go b/libnetwork/cmd/test/main.go index ed4c0165d1..fd489a2330 100644 --- a/libnetwork/cmd/test/main.go +++ b/libnetwork/cmd/test/main.go @@ -2,16 +2,18 @@ package main import ( "fmt" - "log" "net" "os" "time" + log "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork" "github.com/docker/libnetwork/options" ) func main() { + log.SetLevel(log.DebugLevel) os.Setenv("LIBNETWORK_CFG", "libnetwork.toml") controller, err := libnetwork.New("libnetwork.toml") if err != nil { @@ -27,9 +29,12 @@ func main() { for i := 0; i < 100; i++ { netw, err := controller.NewNetwork(netType, fmt.Sprintf("Gordon-%d", i)) if err != nil { - log.Fatal(err) + if _, ok := err.(libnetwork.NetworkNameError); !ok { + log.Fatal(err) + } + } else { + fmt.Println("Network Created Successfully :", netw) } - fmt.Println("Network Created Successfully :", netw) time.Sleep(10 * time.Second) } } diff --git a/libnetwork/controller.go b/libnetwork/controller.go index f5aaac5b37..6d1f26cf7d 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -61,7 +61,6 @@ import ( "github.com/docker/libnetwork/hostdiscovery" "github.com/docker/libnetwork/sandbox" "github.com/docker/libnetwork/types" - "github.com/docker/swarm/pkg/store" ) // NetworkController provides the interface for controller instance which manages @@ -104,6 +103,7 @@ type controller struct { sandboxes sandboxTable cfg *config.Config store datastore.DataStore + stopChan chan struct{} sync.Mutex } @@ -133,6 +133,7 @@ func New(configFile string) (NetworkController, error) { // But without that, datastore cannot be initialized. log.Debugf("Unable to Parse LibNetwork Config file : %v", err) } + c.stopChan = make(chan struct{}) return c, nil } @@ -172,9 +173,7 @@ func (c *controller) initDataStore() error { c.Lock() c.store = store c.Unlock() - go c.watchNewNetworks() - - return nil + return c.watchNewNetworks() } func (c *controller) initDiscovery() error { @@ -242,17 +241,15 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti // Construct the network object network := &network{ - name: name, - id: types.UUID(stringid.GenerateRandomID()), - ctrlr: c, - driver: d, - endpoints: endpointTable{}, + name: name, + networkType: networkType, + id: types.UUID(stringid.GenerateRandomID()), + ctrlr: c, + driver: d, + endpoints: endpointTable{}, } network.processOptions(options...) - if err := c.addNetworkToStore(network); err != nil { - return nil, err - } // Create the network if err := d.CreateNetwork(network.id, network.generic); err != nil { return nil, err @@ -263,6 +260,10 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti c.networks[network.id] = network c.Unlock() + if err := c.addNetworkToStore(network); err != nil { + return nil, err + } + return network, nil } @@ -291,38 +292,53 @@ func (c *controller) addNetworkToStore(n *network) error { log.Debugf("datastore not initialized. Network %s is not added to the store", n.Name()) return nil } - return cs.PutObjectAtomic(n) + + // Commenting out AtomicPut due to https://github.com/docker/swarm/issues/875, + // Also Network object is Keyed with UUID & hence an Atomic put is not mandatory. + // return cs.PutObjectAtomic(n) + + return cs.PutObject(n) } -func (c *controller) watchNewNetworks() { +func (c *controller) watchNewNetworks() error { c.Lock() cs := c.store c.Unlock() - cs.KVStore().WatchRange(datastore.Key(datastore.NetworkKeyPrefix), "", 0, func(kvi []store.KVEntry) { - for _, kve := range kvi { - var n network - err := json.Unmarshal(kve.Value(), &n) - if err != nil { - log.Error(err) - continue - } - n.dbIndex = kve.LastIndex() - c.Lock() - existing, ok := c.networks[n.id] - c.Unlock() - if ok && existing.dbIndex == n.dbIndex { - // Skip any watch notification for a network that has not changed - continue - } else if ok { - // Received an update for an existing network object - log.Debugf("Skipping network update for %s (%s)", n.name, n.id) - continue - } + kvPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.NetworkKeyPrefix), c.stopChan) + if err != nil { + return err + } + go func() { + for { + select { + case kvs := <-kvPairs: + for _, kve := range kvs { + var n network + err := json.Unmarshal(kve.Value, &n) + if err != nil { + log.Error(err) + continue + } + n.dbIndex = kve.LastIndex + c.Lock() + existing, ok := c.networks[n.id] + c.Unlock() + if ok && existing.dbIndex == n.dbIndex { + // Skip any watch notification for a network that has not changed + continue + } else if ok { + // Received an update for an existing network object + log.Debugf("Skipping network update for %s (%s)", n.name, n.id) + continue + } - c.newNetworkFromStore(&n) + c.newNetworkFromStore(&n) + } + } } - }) + }() + return nil } func (c *controller) Networks() []Network { diff --git a/libnetwork/datastore/datastore.go b/libnetwork/datastore/datastore.go index 3199deab02..9f512b07df 100644 --- a/libnetwork/datastore/datastore.go +++ b/libnetwork/datastore/datastore.go @@ -53,7 +53,7 @@ var errInvalidAtomicRequest = errors.New("Invalid Atomic Request") // newClient used to connect to KV Store func newClient(kv string, addrs string) (DataStore, error) { - store, err := store.CreateStore(kv, []string{addrs}, store.Config{}) + store, err := store.NewStore(store.Backend(kv), []string{addrs}, &store.Config{}) if err != nil { return nil, err } @@ -89,16 +89,14 @@ func (ds *datastore) PutObjectAtomic(kvObject KV) error { if kvObjValue == nil { return errInvalidAtomicRequest } - _, err := ds.store.AtomicPut(Key(kvObject.Key()...), []byte{}, kvObjValue, kvObject.Index()) + + previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()} + _, pair, err := ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil) if err != nil { return err } - _, index, err := ds.store.Get(Key(kvObject.Key()...)) - if err != nil { - return err - } - kvObject.SetIndex(index) + kvObject.SetIndex(pair.LastIndex) return nil } @@ -116,5 +114,5 @@ func (ds *datastore) putObjectWithKey(kvObject KV, key ...string) error { if kvObjValue == nil { return errors.New("Object must provide marshalled data for key : " + Key(kvObject.Key()...)) } - return ds.store.Put(Key(key...), kvObjValue) + return ds.store.Put(Key(key...), kvObjValue, nil) } diff --git a/libnetwork/datastore/datastore_test.go b/libnetwork/datastore/datastore_test.go index 956d87b456..08731be9a6 100644 --- a/libnetwork/datastore/datastore_test.go +++ b/libnetwork/datastore/datastore_test.go @@ -35,12 +35,12 @@ func TestKVObjectFlatKey(t *testing.T) { t.Fatal(err) } keychain := []string{dummyKey, "1000"} - data, _, err := store.KVStore().Get(Key(keychain...)) + data, err := store.KVStore().Get(Key(keychain...)) if err != nil { t.Fatal(err) } var n dummyObject - json.Unmarshal(data, &n) + json.Unmarshal(data.Value, &n) if n.Name != expected.Name { t.Fatalf("Dummy object doesnt match the expected object") } @@ -63,14 +63,14 @@ func TestAtomicKVObjectFlatKey(t *testing.T) { // Get the latest index and try PutObjectAtomic again for the same Key // This must succeed as well - data, index, err := store.KVStore().Get(Key(expected.Key()...)) + data, err := store.KVStore().Get(Key(expected.Key()...)) if err != nil { t.Fatal(err) } n := dummyObject{} - json.Unmarshal(data, &n) + json.Unmarshal(data.Value, &n) n.ID = "1111" - n.DBIndex = index + n.DBIndex = data.LastIndex n.ReturnValue = true err = store.PutObjectAtomic(&n) if err != nil { diff --git a/libnetwork/datastore/mock_store.go b/libnetwork/datastore/mock_store.go index 1d52bd4932..84084c915c 100644 --- a/libnetwork/datastore/mock_store.go +++ b/libnetwork/datastore/mock_store.go @@ -2,7 +2,6 @@ package datastore import ( "errors" - "time" "github.com/docker/swarm/pkg/store" ) @@ -31,17 +30,17 @@ func NewMockStore() *MockStore { // Get the value at "key", returns the last modified index // to use in conjunction to CAS calls -func (s *MockStore) Get(key string) (value []byte, lastIndex uint64, err error) { +func (s *MockStore) Get(key string) (*store.KVPair, error) { mData := s.db[key] if mData == nil { - return nil, 0, nil + return nil, nil } - return mData.Data, mData.Index, nil + return &store.KVPair{Value: mData.Data, LastIndex: mData.Index}, nil } // Put a value at "key" -func (s *MockStore) Put(key string, value []byte) error { +func (s *MockStore) Put(key string, value []byte, options *store.WriteOptions) error { mData := s.db[key] if mData == nil { mData = &MockData{value, 0} @@ -63,68 +62,50 @@ func (s *MockStore) Exists(key string) (bool, error) { return ok, nil } -// GetRange gets a range of values at "directory" -func (s *MockStore) GetRange(prefix string) (values []store.KVEntry, err error) { +// List gets a range of values at "directory" +func (s *MockStore) List(prefix string) ([]*store.KVPair, error) { return nil, ErrNotImplmented } -// DeleteRange deletes a range of values at "directory" -func (s *MockStore) DeleteRange(prefix string) error { +// DeleteTree deletes a range of values at "directory" +func (s *MockStore) DeleteTree(prefix string) error { return ErrNotImplmented } // Watch a single key for modifications -func (s *MockStore) Watch(key string, heartbeat time.Duration, callback store.WatchCallback) error { - return ErrNotImplmented +func (s *MockStore) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { + return nil, ErrNotImplmented } -// CancelWatch cancels a watch, sends a signal to the appropriate -// stop channel -func (s *MockStore) CancelWatch(key string) error { - return ErrNotImplmented +// WatchTree triggers a watch on a range of values at "directory" +func (s *MockStore) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { + return nil, ErrNotImplmented } -// Internal function to check if a key has changed -func (s *MockStore) waitForChange(key string) <-chan uint64 { - return nil -} - -// WatchRange triggers a watch on a range of values at "directory" -func (s *MockStore) WatchRange(prefix string, filter string, heartbeat time.Duration, callback store.WatchCallback) error { - return ErrNotImplmented -} - -// CancelWatchRange stops the watch on the range of values, sends -// a signal to the appropriate stop channel -func (s *MockStore) CancelWatchRange(prefix string) error { - return ErrNotImplmented -} - -// Acquire the lock for "key"/"directory" -func (s *MockStore) Acquire(key string, value []byte) (string, error) { - return "", ErrNotImplmented -} - -// Release the lock for "key"/"directory" -func (s *MockStore) Release(id string) error { - return ErrNotImplmented +// NewLock exposed +func (s *MockStore) NewLock(key string, options *store.LockOptions) (store.Locker, error) { + return nil, ErrNotImplmented } // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case -func (s *MockStore) AtomicPut(key string, _ []byte, newValue []byte, index uint64) (bool, error) { +func (s *MockStore) AtomicPut(key string, newValue []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) { mData := s.db[key] - if mData != nil && mData.Index != index { - return false, errInvalidAtomicRequest + if mData != nil && mData.Index != previous.LastIndex { + return false, nil, errInvalidAtomicRequest } - return true, s.Put(key, newValue) + err := s.Put(key, newValue, nil) + if err != nil { + return false, nil, err + } + return true, &store.KVPair{Key: key, Value: newValue, LastIndex: s.db[key].Index}, nil } // AtomicDelete deletes a value at "key" if the key has not // been modified in the meantime, throws an error if this is the case -func (s *MockStore) AtomicDelete(key string, oldValue []byte, index uint64) (bool, error) { +func (s *MockStore) AtomicDelete(key string, previous *store.KVPair) (bool, error) { mData := s.db[key] - if mData != nil && mData.Index != index { + if mData != nil && mData.Index != previous.LastIndex { return false, errInvalidAtomicRequest } return true, s.Delete(key) diff --git a/libnetwork/hostdiscovery/hostdiscovery.go b/libnetwork/hostdiscovery/hostdiscovery.go index 57a187e9e5..509592ba0d 100644 --- a/libnetwork/hostdiscovery/hostdiscovery.go +++ b/libnetwork/hostdiscovery/hostdiscovery.go @@ -24,7 +24,7 @@ import ( _ "github.com/docker/swarm/discovery/token" ) -const defaultHeartbeat = 10 +const defaultHeartbeat = time.Duration(10) * time.Second type hostDiscovery struct { discovery discovery.Discovery @@ -43,17 +43,17 @@ func (h *hostDiscovery) StartDiscovery(cfg *config.ClusterCfg, joinCallback Join return fmt.Errorf("discovery requires a valid configuration") } - hb := cfg.Heartbeat + hb := time.Duration(cfg.Heartbeat) * time.Second if hb == 0 { hb = defaultHeartbeat } - d, err := discovery.New(cfg.Discovery, hb) + d, err := discovery.New(cfg.Discovery, hb, 3*hb) if err != nil { return err } if ip := net.ParseIP(cfg.Address); ip == nil { - return errors.New("Address config should be either ipv4 or ipv6 address") + return errors.New("address config should be either ipv4 or ipv6 address") } if err := d.Register(cfg.Address + ":0"); err != nil { @@ -64,14 +64,25 @@ func (h *hostDiscovery) StartDiscovery(cfg *config.ClusterCfg, joinCallback Join h.discovery = d h.Unlock() - go d.Watch(func(entries []*discovery.Entry) { - h.processCallback(entries, joinCallback, leaveCallback) - }) - - go sustainHeartbeat(d, hb, cfg, h.stopChan) + discoveryCh, errCh := d.Watch(h.stopChan) + go h.monitorDiscovery(discoveryCh, errCh, joinCallback, leaveCallback) + go h.sustainHeartbeat(d, hb, cfg) return nil } +func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error, joinCallback JoinCallback, leaveCallback LeaveCallback) { + for { + select { + case entries := <-ch: + h.processCallback(entries, joinCallback, leaveCallback) + case err := <-errCh: + log.Errorf("discovery error: %v", err) + case <-h.stopChan: + return + } + } +} + func (h *hostDiscovery) StopDiscovery() error { h.Lock() stopChan := h.stopChan @@ -82,12 +93,12 @@ func (h *hostDiscovery) StopDiscovery() error { return nil } -func sustainHeartbeat(d discovery.Discovery, hb uint64, config *config.ClusterCfg, stopChan chan struct{}) { +func (h *hostDiscovery) sustainHeartbeat(d discovery.Discovery, hb time.Duration, config *config.ClusterCfg) { for { select { - case <-stopChan: + case <-h.stopChan: return - case <-time.After(time.Duration(hb) * time.Second): + case <-time.After(hb): if err := d.Register(config.Address + ":0"); err != nil { log.Warn(err) } @@ -95,7 +106,7 @@ func sustainHeartbeat(d discovery.Discovery, hb uint64, config *config.ClusterCf } } -func (h *hostDiscovery) processCallback(entries []*discovery.Entry, joinCallback JoinCallback, leaveCallback LeaveCallback) { +func (h *hostDiscovery) processCallback(entries discovery.Entries, joinCallback JoinCallback, leaveCallback LeaveCallback) { updated := hosts(entries) h.Lock() existing := h.nodes @@ -125,23 +136,15 @@ func diff(existing mapset.Set, updated mapset.Set) (added []net.IP, removed []ne func (h *hostDiscovery) Fetch() ([]net.IP, error) { h.Lock() - hd := h.discovery - h.Unlock() - if hd == nil { - return nil, errors.New("No Active Discovery") - } - entries, err := hd.Fetch() - if err != nil { - return nil, err - } + defer h.Unlock() ips := []net.IP{} - for _, entry := range entries { - ips = append(ips, net.ParseIP(entry.Host)) + for _, ipstr := range h.nodes.ToSlice() { + ips = append(ips, net.ParseIP(ipstr.(string))) } return ips, nil } -func hosts(entries []*discovery.Entry) mapset.Set { +func hosts(entries discovery.Entries) mapset.Set { hosts := mapset.NewSet() for _, entry := range entries { hosts.Add(entry.Host) diff --git a/libnetwork/hostdiscovery/hostdiscovery_test.go b/libnetwork/hostdiscovery/hostdiscovery_test.go index 43b4c82fbd..1b48d1594c 100644 --- a/libnetwork/hostdiscovery/hostdiscovery_test.go +++ b/libnetwork/hostdiscovery/hostdiscovery_test.go @@ -15,7 +15,7 @@ import ( ) func TestDiscovery(t *testing.T) { - _, err := net.Dial("tcp", "discovery-stage.hub.docker.com:80") + _, err := net.DialTimeout("tcp", "discovery-stage.hub.docker.com:80", 10*time.Second) if err != nil { t.Skip("Skipping Discovery test which need connectivity to discovery-stage.hub.docker.com") } @@ -52,7 +52,7 @@ func TestDiscovery(t *testing.T) { } func TestBadDiscovery(t *testing.T) { - _, err := net.Dial("tcp", "discovery-stage.hub.docker.com:80") + _, err := net.DialTimeout("tcp", "discovery-stage.hub.docker.com:80", 10*time.Second) if err != nil { t.Skip("Skipping Discovery test which need connectivity to discovery-stage.hub.docker.com") }