diff --git a/libnetwork/cmd/test/main.go b/libnetwork/cmd/test/main.go index 9b735528cc..951b756639 100644 --- a/libnetwork/cmd/test/main.go +++ b/libnetwork/cmd/test/main.go @@ -26,7 +26,7 @@ func main() { options := options.Generic{"AddressIPv4": net} err = controller.ConfigureNetworkDriver(netType, options) - for i := 0; i < 100; i++ { + for i := 0; i < 10; i++ { netw, err := controller.NewNetwork(netType, fmt.Sprintf("Gordon-%d", i)) if err != nil { if _, ok := err.(libnetwork.NetworkNameError); !ok { @@ -46,6 +46,6 @@ func main() { log.Fatalf("Error creating endpoint 2 %v", err) } - time.Sleep(10 * time.Second) + time.Sleep(2 * time.Second) } } diff --git a/libnetwork/datastore/datastore.go b/libnetwork/datastore/datastore.go index cb37247e57..125be0b953 100644 --- a/libnetwork/datastore/datastore.go +++ b/libnetwork/datastore/datastore.go @@ -17,6 +17,12 @@ type DataStore interface { PutObject(kvObject KV) error // PutObjectAtomic provides an atomic add and update operation for a Record PutObjectAtomic(kvObject KV) error + // DeleteObject deletes a record + DeleteObject(kvObject KV) error + // DeleteObjectAtomic performs an atomic delete operation + DeleteObjectAtomic(kvObject KV) error + // DeleteTree deletes a record + DeleteTree(kvObject KV) error // KVStore returns access to the KV Store KVStore() store.Store } @@ -29,6 +35,8 @@ type datastore struct { type KV interface { // Key method lets an object to provide the Key to be used in KV Store Key() []string + // KeyPrefix method lets an object to return immediate parent key that can be used for tree walk + KeyPrefix() []string // Value method lets an object to marshal its content to be stored in the KV store Value() []byte // Index method returns the latest DB Index as seen by the object @@ -122,6 +130,7 @@ func (ds *datastore) putObjectWithKey(kvObject KV, key ...string) error { return ds.store.Put(Key(key...), kvObjValue, nil) } +// GetObject returns a record matching the key func (ds *datastore) GetObject(key string, o interface{}) error { kvPair, err := ds.store.Get(key) if err != nil { @@ -129,3 +138,28 @@ func (ds *datastore) GetObject(key string, o interface{}) error { } return json.Unmarshal(kvPair.Value, o) } + +// DeleteObject unconditionally deletes a record from the store +func (ds *datastore) DeleteObject(kvObject KV) error { + return ds.store.Delete(Key(kvObject.Key()...)) +} + +// DeleteObjectAtomic performs atomic delete on a record +func (ds *datastore) DeleteObjectAtomic(kvObject KV) error { + if kvObject == nil { + return errors.New("kvObject is nil") + } + + previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()} + _, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous) + if err != nil { + return err + } + + return nil +} + +// DeleteTree unconditionally deletes a record from the store +func (ds *datastore) DeleteTree(kvObject KV) error { + return ds.store.DeleteTree(Key(kvObject.KeyPrefix()...)) +} diff --git a/libnetwork/endpoint.go b/libnetwork/endpoint.go index cf1634d853..d4bb7a66a2 100644 --- a/libnetwork/endpoint.go +++ b/libnetwork/endpoint.go @@ -122,6 +122,9 @@ func (ep *endpoint) MarshalJSON() ([]byte, error) { epMap["ep_iface"] = ep.iFaces epMap["exposed_ports"] = ep.exposedPorts epMap["generic"] = ep.generic + if ep.container != nil { + epMap["container"] = ep.container + } return json.Marshal(epMap) } @@ -151,6 +154,14 @@ func (ep *endpoint) UnmarshalJSON(b []byte) (err error) { json.Unmarshal(tb, &tPorts) ep.exposedPorts = tPorts + epc, ok := epMap["container"] + if ok { + cb, _ := json.Marshal(epc) + var cInfo containerInfo + json.Unmarshal(cb, &cInfo) + ep.container = &cInfo + } + if epMap["generic"] != nil { ep.generic = epMap["generic"].(map[string]interface{}) } @@ -184,6 +195,10 @@ func (ep *endpoint) Key() []string { return []string{datastore.EndpointKeyPrefix, string(ep.network.id), string(ep.id)} } +func (ep *endpoint) KeyPrefix() []string { + return []string{datastore.EndpointKeyPrefix, string(ep.network.id)} +} + func (ep *endpoint) Value() []byte { b, err := json.Marshal(ep) if err != nil { @@ -302,11 +317,13 @@ func (ep *endpoint) Join(containerID string, options ...EndpointOption) error { ep.Unlock() defer func() { - ep.Lock() if err != nil { + ep.Lock() ep.container = nil + ep.Unlock() + } else { + ep.network.ctrlr.addEndpointToStore(ep) } - ep.Unlock() }() network.Lock() @@ -410,14 +427,28 @@ func (ep *endpoint) Leave(containerID string, options ...EndpointOption) error { func (ep *endpoint) Delete() error { ep.Lock() - epid := ep.id - name := ep.name if ep.container != nil { ep.Unlock() - return &ActiveContainerError{name: name, id: string(epid)} + return &ActiveContainerError{name: ep.name, id: string(ep.id)} + } + ep.Unlock() + + if err := ep.deleteEndpoint(); err != nil { + return err } + if err := ep.network.ctrlr.deleteEndpointFromStore(ep); err != nil { + return err + } + return nil +} + +func (ep *endpoint) deleteEndpoint() error { + var err error + ep.Lock() n := ep.network + name := ep.name + epid := ep.id ep.Unlock() n.Lock() diff --git a/libnetwork/network.go b/libnetwork/network.go index ee66ce799d..2dec288f40 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -93,6 +93,10 @@ func (n *network) Key() []string { return []string{datastore.NetworkKeyPrefix, string(n.id)} } +func (n *network) KeyPrefix() []string { + return []string{datastore.NetworkKeyPrefix} +} + func (n *network) Value() []byte { b, err := json.Marshal(n) if err != nil { @@ -175,7 +179,22 @@ func (n *network) Delete() error { n.ctrlr.Unlock() return &ActiveEndpointsError{name: n.name, id: string(n.id)} } + n.ctrlr.Unlock() + if err = n.deleteNetwork(); err != nil { + return err + } + + if err = n.ctrlr.deleteNetworkFromStore(n); err != nil { + log.Warnf("Delete network (%s - %v) failed from datastore : %v", n.name, n.id, err) + } + + return nil +} + +func (n *network) deleteNetwork() error { + var err error + n.ctrlr.Lock() delete(n.ctrlr.networks, n.id) n.ctrlr.Unlock() if err := n.driver.DeleteNetwork(n.id); err != nil { diff --git a/libnetwork/store.go b/libnetwork/store.go index 5cdcfde10f..4fe98d233c 100644 --- a/libnetwork/store.go +++ b/libnetwork/store.go @@ -52,6 +52,29 @@ func (c *controller) addNetworkToStore(n *network) error { return cs.PutObject(n) } +func (c *controller) deleteNetworkFromStore(n *network) error { + if isReservedNetwork(n.Name()) { + return nil + } + c.Lock() + cs := c.store + c.Unlock() + if cs == nil { + log.Debugf("datastore not initialized. Network %s is not deleted from datastore", n.Name()) + return nil + } + + if err := cs.DeleteObject(n); err != nil { + return err + } + + if err := cs.DeleteTree(&endpoint{network: n}); err != nil { + return err + } + + return nil +} + func (c *controller) getNetworkFromStore(nid types.UUID) (*network, error) { n := network{id: nid} if err := c.store.GetObject(datastore.Key(n.Key()...), &n); err != nil { @@ -88,9 +111,11 @@ func (c *controller) newEndpointFromStore(ep *endpoint) { } func (c *controller) addEndpointToStore(ep *endpoint) error { + ep.Lock() if isReservedNetwork(ep.network.name) { return nil } + ep.Unlock() c.Lock() cs := c.store c.Unlock() @@ -114,6 +139,25 @@ func (c *controller) getEndpointFromStore(eid types.UUID) (*endpoint, error) { return &ep, nil } +func (c *controller) deleteEndpointFromStore(ep *endpoint) error { + if isReservedNetwork(ep.network.Name()) { + return nil + } + c.Lock() + cs := c.store + c.Unlock() + if cs == nil { + log.Debugf("datastore not initialized. endpoint %s is not deleted from datastore", ep.Name()) + return nil + } + + if err := cs.DeleteObject(ep); err != nil { + return err + } + + return nil +} + func (c *controller) watchStore() error { c.Lock() cs := c.store @@ -145,7 +189,7 @@ func (c *controller) watchStore() error { if ok { // Skip existing network update if existing.dbIndex != n.dbIndex { - log.Debugf("Skipping network update for %s (%s)", n.name, n.id) + existing.dbIndex = n.dbIndex } continue } @@ -170,9 +214,11 @@ func (c *controller) watchStore() error { if ok { existing, _ := n.EndpointByID(string(ep.id)) if existing != nil { + ee := existing.(*endpoint) // Skip existing endpoint update - if existing.(*endpoint).dbIndex != ep.dbIndex { - log.Debugf("Skipping endpoint update for %s (%s)", ep.name, ep.id) + if ee.dbIndex != ep.dbIndex { + ee.dbIndex = ep.dbIndex + ee.container = ep.container } continue }