diff --git a/libnetwork/controller.go b/libnetwork/controller.go index a2ab5046ef..b66fe1d8ef 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -263,6 +263,9 @@ func (c *controller) addNetwork(n *network) error { if err := d.CreateNetwork(n.id, n.generic); err != nil { return err } + if err := n.watchEndpoints(); err != nil { + return err + } c.Lock() c.networks[n.id] = n c.Unlock() diff --git a/libnetwork/datastore/datastore.go b/libnetwork/datastore/datastore.go index 9f8d500104..cde605dce5 100644 --- a/libnetwork/datastore/datastore.go +++ b/libnetwork/datastore/datastore.go @@ -30,7 +30,10 @@ type DataStore interface { } // ErrKeyModified is raised for an atomic update when the update is working on a stale state -var ErrKeyModified = store.ErrKeyModified +var ( + ErrKeyModified = store.ErrKeyModified + ErrKeyNotFound = store.ErrKeyNotFound +) type datastore struct { store store.Store diff --git a/libnetwork/endpoint.go b/libnetwork/endpoint.go index 65dd91cfbc..a4bce82a5b 100644 --- a/libnetwork/endpoint.go +++ b/libnetwork/endpoint.go @@ -554,7 +554,7 @@ func (ep *endpoint) deleteEndpoint() error { _, ok := n.endpoints[epid] if !ok { n.Unlock() - return &UnknownEndpointError{name: name, id: string(epid)} + return nil } nid := n.id diff --git a/libnetwork/libnetwork_test.go b/libnetwork/libnetwork_test.go index 81c8b91c1f..66813d2c83 100644 --- a/libnetwork/libnetwork_test.go +++ b/libnetwork/libnetwork_test.go @@ -547,15 +547,6 @@ func TestUnknownEndpoint(t *testing.T) { t.Fatal(err) } - err = ep.Delete() - if err == nil { - t.Fatal("Expected to fail. But instead succeeded") - } - - if _, ok := err.(*libnetwork.UnknownEndpointError); !ok { - t.Fatalf("Did not fail with expected error. Actual error: %v", err) - } - // Done testing. Now cleanup if err := network.Delete(); err != nil { t.Fatal(err) diff --git a/libnetwork/network.go b/libnetwork/network.go index 28369656f4..1c48832ee7 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -62,6 +62,7 @@ type network struct { endpoints endpointTable generic options.Generic dbIndex uint64 + stopWatchCh chan struct{} sync.Mutex } @@ -248,6 +249,7 @@ func (n *network) deleteNetwork() error { } log.Warnf("driver error deleting network %s : %v", n.name, err) } + n.stopWatch() return nil } diff --git a/libnetwork/store.go b/libnetwork/store.go index b1b1b5416f..f4e1fe4536 100644 --- a/libnetwork/store.go +++ b/libnetwork/store.go @@ -31,7 +31,7 @@ func (c *controller) initDataStore() error { c.Lock() c.store = store c.Unlock() - return c.watchStore() + return c.watchNetworks() } func (c *controller) newNetworkFromStore(n *network) error { @@ -92,22 +92,6 @@ func (c *controller) newEndpointFromStore(key string, ep *endpoint) error { n := ep.network id := ep.id ep.Unlock() - if n == nil { - // Possibly the watch event for the network has not shown up yet - // Try to get network from the store - nid, err := networkIDFromEndpointKey(key, ep) - if err != nil { - return err - } - n, err = c.getNetworkFromStore(nid) - if err != nil { - return err - } - if err := c.newNetworkFromStore(n); err != nil { - return err - } - n = c.networks[nid] - } _, err := n.EndpointByID(string(id)) if err != nil { @@ -170,7 +154,11 @@ func (c *controller) deleteEndpointFromStore(ep *endpoint) error { return nil } -func (c *controller) watchStore() error { +func (c *controller) watchNetworks() error { + if !c.validateDatastoreConfig() { + return nil + } + c.Lock() cs := c.store c.Unlock() @@ -179,14 +167,17 @@ func (c *controller) watchStore() error { if err != nil { return err } - epPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.EndpointKeyPrefix), nil) - if err != nil { - return err - } go func() { for { select { case nws := <-nwPairs: + c.Lock() + tmpview := networkTable{} + lview := c.networks + c.Unlock() + for k, v := range lview { + tmpview[k] = v + } for _, kve := range nws { var n network err := json.Unmarshal(kve.Value, &n) @@ -194,6 +185,7 @@ func (c *controller) watchStore() error { log.Error(err) continue } + delete(tmpview, n.id) n.dbIndex = kve.LastIndex c.Lock() existing, ok := c.networks[n.id] @@ -212,31 +204,22 @@ func (c *controller) watchStore() error { if err = c.newNetworkFromStore(&n); err != nil { log.Error(err) } + } - case eps := <-epPairs: - for _, epe := range eps { - var ep endpoint - err := json.Unmarshal(epe.Value, &ep) - if err != nil { - log.Error(err) + // Delete processing + for k := range tmpview { + c.Lock() + existing, ok := c.networks[k] + c.Unlock() + if !ok { continue } - ep.dbIndex = epe.LastIndex - n, err := c.networkFromEndpointKey(epe.Key, &ep) - if err != nil { - if _, ok := err.(ErrNoSuchNetwork); !ok { - log.Error(err) - continue - } + tmp := network{} + if err := c.store.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound { + continue } - if n != nil { - ep.network = n.(*network) - } - if c.processEndpointUpdate(&ep) { - err = c.newEndpointFromStore(epe.Key, &ep) - if err != nil { - log.Error(err) - } + if err := existing.deleteNetwork(); err != nil { + log.Debugf("Delete failed %s: %s", existing.name, err) } } } @@ -245,20 +228,81 @@ func (c *controller) watchStore() error { return nil } -func (c *controller) networkFromEndpointKey(key string, ep *endpoint) (Network, error) { - nid, err := networkIDFromEndpointKey(key, ep) - if err != nil { - return nil, err +func (n *network) watchEndpoints() error { + if !n.ctrlr.validateDatastoreConfig() { + return nil } - return c.NetworkByID(string(nid)) + + n.Lock() + cs := n.ctrlr.store + tmp := endpoint{network: n} + n.stopWatchCh = make(chan struct{}) + stopCh := n.stopWatchCh + n.Unlock() + + epPairs, err := cs.KVStore().WatchTree(datastore.Key(tmp.KeyPrefix()...), stopCh) + if err != nil { + return err + } + go func() { + for { + select { + case <-stopCh: + return + case eps := <-epPairs: + n.Lock() + tmpview := endpointTable{} + lview := n.endpoints + n.Unlock() + for k, v := range lview { + tmpview[k] = v + } + for _, epe := range eps { + var ep endpoint + err := json.Unmarshal(epe.Value, &ep) + if err != nil { + log.Error(err) + continue + } + delete(tmpview, ep.id) + ep.dbIndex = epe.LastIndex + ep.network = n + if n.ctrlr.processEndpointUpdate(&ep) { + err = n.ctrlr.newEndpointFromStore(epe.Key, &ep) + if err != nil { + log.Error(err) + } + } + } + // Delete processing + for k := range tmpview { + n.Lock() + existing, ok := n.endpoints[k] + n.Unlock() + if !ok { + continue + } + tmp := endpoint{} + if err := cs.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound { + continue + } + if err := existing.deleteEndpoint(); err != nil { + log.Debugf("Delete failed %s: %s", existing.name, err) + } + } + } + } + }() + return nil } -func networkIDFromEndpointKey(key string, ep *endpoint) (types.UUID, error) { - eKey, err := datastore.ParseKey(key) - if err != nil { - return types.UUID(""), err +func (n *network) stopWatch() { + n.Lock() + if n.stopWatchCh != nil { + close(n.stopWatchCh) + n.stopWatchCh = nil } - return ep.networkIDFromKey(eKey) + n.Unlock() } func (c *controller) processEndpointUpdate(ep *endpoint) bool {