Distributed delete processing

Signed-off-by: Madhu Venugopal <madhu@docker.com>
This commit is contained in:
Madhu Venugopal 2015-06-17 09:13:31 -07:00
parent 9ec67edb66
commit 2677a461de
6 changed files with 107 additions and 64 deletions

View File

@ -263,6 +263,9 @@ func (c *controller) addNetwork(n *network) error {
if err := d.CreateNetwork(n.id, n.generic); err != nil {
return err
}
if err := n.watchEndpoints(); err != nil {
return err
}
c.Lock()
c.networks[n.id] = n
c.Unlock()

View File

@ -30,7 +30,10 @@ type DataStore interface {
}
// ErrKeyModified is raised for an atomic update when the update is working on a stale state
var ErrKeyModified = store.ErrKeyModified
var (
ErrKeyModified = store.ErrKeyModified
ErrKeyNotFound = store.ErrKeyNotFound
)
type datastore struct {
store store.Store

View File

@ -554,7 +554,7 @@ func (ep *endpoint) deleteEndpoint() error {
_, ok := n.endpoints[epid]
if !ok {
n.Unlock()
return &UnknownEndpointError{name: name, id: string(epid)}
return nil
}
nid := n.id

View File

@ -547,15 +547,6 @@ func TestUnknownEndpoint(t *testing.T) {
t.Fatal(err)
}
err = ep.Delete()
if err == nil {
t.Fatal("Expected to fail. But instead succeeded")
}
if _, ok := err.(*libnetwork.UnknownEndpointError); !ok {
t.Fatalf("Did not fail with expected error. Actual error: %v", err)
}
// Done testing. Now cleanup
if err := network.Delete(); err != nil {
t.Fatal(err)

View File

@ -62,6 +62,7 @@ type network struct {
endpoints endpointTable
generic options.Generic
dbIndex uint64
stopWatchCh chan struct{}
sync.Mutex
}
@ -248,6 +249,7 @@ func (n *network) deleteNetwork() error {
}
log.Warnf("driver error deleting network %s : %v", n.name, err)
}
n.stopWatch()
return nil
}

View File

@ -31,7 +31,7 @@ func (c *controller) initDataStore() error {
c.Lock()
c.store = store
c.Unlock()
return c.watchStore()
return c.watchNetworks()
}
func (c *controller) newNetworkFromStore(n *network) error {
@ -92,22 +92,6 @@ func (c *controller) newEndpointFromStore(key string, ep *endpoint) error {
n := ep.network
id := ep.id
ep.Unlock()
if n == nil {
// Possibly the watch event for the network has not shown up yet
// Try to get network from the store
nid, err := networkIDFromEndpointKey(key, ep)
if err != nil {
return err
}
n, err = c.getNetworkFromStore(nid)
if err != nil {
return err
}
if err := c.newNetworkFromStore(n); err != nil {
return err
}
n = c.networks[nid]
}
_, err := n.EndpointByID(string(id))
if err != nil {
@ -170,7 +154,11 @@ func (c *controller) deleteEndpointFromStore(ep *endpoint) error {
return nil
}
func (c *controller) watchStore() error {
func (c *controller) watchNetworks() error {
if !c.validateDatastoreConfig() {
return nil
}
c.Lock()
cs := c.store
c.Unlock()
@ -179,14 +167,17 @@ func (c *controller) watchStore() error {
if err != nil {
return err
}
epPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.EndpointKeyPrefix), nil)
if err != nil {
return err
}
go func() {
for {
select {
case nws := <-nwPairs:
c.Lock()
tmpview := networkTable{}
lview := c.networks
c.Unlock()
for k, v := range lview {
tmpview[k] = v
}
for _, kve := range nws {
var n network
err := json.Unmarshal(kve.Value, &n)
@ -194,6 +185,7 @@ func (c *controller) watchStore() error {
log.Error(err)
continue
}
delete(tmpview, n.id)
n.dbIndex = kve.LastIndex
c.Lock()
existing, ok := c.networks[n.id]
@ -212,31 +204,22 @@ func (c *controller) watchStore() error {
if err = c.newNetworkFromStore(&n); err != nil {
log.Error(err)
}
}
case eps := <-epPairs:
for _, epe := range eps {
var ep endpoint
err := json.Unmarshal(epe.Value, &ep)
if err != nil {
log.Error(err)
// Delete processing
for k := range tmpview {
c.Lock()
existing, ok := c.networks[k]
c.Unlock()
if !ok {
continue
}
ep.dbIndex = epe.LastIndex
n, err := c.networkFromEndpointKey(epe.Key, &ep)
if err != nil {
if _, ok := err.(ErrNoSuchNetwork); !ok {
log.Error(err)
continue
}
tmp := network{}
if err := c.store.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound {
continue
}
if n != nil {
ep.network = n.(*network)
}
if c.processEndpointUpdate(&ep) {
err = c.newEndpointFromStore(epe.Key, &ep)
if err != nil {
log.Error(err)
}
if err := existing.deleteNetwork(); err != nil {
log.Debugf("Delete failed %s: %s", existing.name, err)
}
}
}
@ -245,20 +228,81 @@ func (c *controller) watchStore() error {
return nil
}
func (c *controller) networkFromEndpointKey(key string, ep *endpoint) (Network, error) {
nid, err := networkIDFromEndpointKey(key, ep)
if err != nil {
return nil, err
func (n *network) watchEndpoints() error {
if !n.ctrlr.validateDatastoreConfig() {
return nil
}
return c.NetworkByID(string(nid))
n.Lock()
cs := n.ctrlr.store
tmp := endpoint{network: n}
n.stopWatchCh = make(chan struct{})
stopCh := n.stopWatchCh
n.Unlock()
epPairs, err := cs.KVStore().WatchTree(datastore.Key(tmp.KeyPrefix()...), stopCh)
if err != nil {
return err
}
go func() {
for {
select {
case <-stopCh:
return
case eps := <-epPairs:
n.Lock()
tmpview := endpointTable{}
lview := n.endpoints
n.Unlock()
for k, v := range lview {
tmpview[k] = v
}
for _, epe := range eps {
var ep endpoint
err := json.Unmarshal(epe.Value, &ep)
if err != nil {
log.Error(err)
continue
}
delete(tmpview, ep.id)
ep.dbIndex = epe.LastIndex
ep.network = n
if n.ctrlr.processEndpointUpdate(&ep) {
err = n.ctrlr.newEndpointFromStore(epe.Key, &ep)
if err != nil {
log.Error(err)
}
}
}
// Delete processing
for k := range tmpview {
n.Lock()
existing, ok := n.endpoints[k]
n.Unlock()
if !ok {
continue
}
tmp := endpoint{}
if err := cs.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound {
continue
}
if err := existing.deleteEndpoint(); err != nil {
log.Debugf("Delete failed %s: %s", existing.name, err)
}
}
}
}
}()
return nil
}
func networkIDFromEndpointKey(key string, ep *endpoint) (types.UUID, error) {
eKey, err := datastore.ParseKey(key)
if err != nil {
return types.UUID(""), err
func (n *network) stopWatch() {
n.Lock()
if n.stopWatchCh != nil {
close(n.stopWatchCh)
n.stopWatchCh = nil
}
return ep.networkIDFromKey(eKey)
n.Unlock()
}
func (c *controller) processEndpointUpdate(ep *endpoint) bool {