Reading the top level element (network) from datastore on init

Currently we rely on watch to catchup after the init. But there could be
a small time window on which, we might end up in a race condition on
network creates. By reading and populating networks during init, we
avoid any such conditions, especially for default network handling.

Signed-off-by: Madhu Venugopal <madhu@docker.com>
This commit is contained in:
Madhu Venugopal 2015-06-18 08:18:17 -07:00
parent 1e1eaf5937
commit 8b59f48537
2 changed files with 51 additions and 30 deletions

View File

@ -2,10 +2,11 @@ title = "LibNetwork Configuration file"
[daemon]
debug = false
DefaultNetwork = "bridge"
DefaultDriver = "bridge"
[cluster]
discovery = "token://22aa23948f4f6b31230687689636959e"
Address = "1.1.1.1"
[datastore]
embedded = false
[datastore.client]
provider = "consul"
Address = "localhost:8500"

View File

@ -5,6 +5,7 @@ import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/docker/libkv/store"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/types"
)
@ -31,9 +32,23 @@ func (c *controller) initDataStore() error {
c.Lock()
c.store = store
c.Unlock()
nws, err := c.getNetworksFromStore()
if err == nil {
c.processNetworkUpdate(nws, nil)
} else if err != datastore.ErrKeyNotFound {
log.Warnf("failed to read networks from datastore during init : %v", err)
}
return c.watchNetworks()
}
func (c *controller) getNetworksFromStore() ([]*store.KVPair, error) {
c.Lock()
cs := c.store
c.Unlock()
return cs.KVStore().List(datastore.Key(datastore.NetworkKeyPrefix))
}
func (c *controller) newNetworkFromStore(n *network) error {
n.Lock()
n.ctrlr = c
@ -178,34 +193,7 @@ func (c *controller) watchNetworks() error {
for k, v := range lview {
tmpview[k] = v
}
for _, kve := range nws {
var n network
err := json.Unmarshal(kve.Value, &n)
if err != nil {
log.Error(err)
continue
}
delete(tmpview, n.id)
n.dbIndex = kve.LastIndex
c.Lock()
existing, ok := c.networks[n.id]
c.Unlock()
if ok {
existing.Lock()
// Skip existing network update
if existing.dbIndex != n.dbIndex {
existing.dbIndex = n.dbIndex
existing.endpointCnt = n.endpointCnt
}
existing.Unlock()
continue
}
if err = c.newNetworkFromStore(&n); err != nil {
log.Error(err)
}
}
c.processNetworkUpdate(nws, &tmpview)
// Delete processing
for k := range tmpview {
c.Lock()
@ -305,6 +293,38 @@ func (n *network) stopWatch() {
n.Unlock()
}
func (c *controller) processNetworkUpdate(nws []*store.KVPair, prune *networkTable) {
for _, kve := range nws {
var n network
err := json.Unmarshal(kve.Value, &n)
if err != nil {
log.Error(err)
continue
}
if prune != nil {
delete(*prune, n.id)
}
n.dbIndex = kve.LastIndex
c.Lock()
existing, ok := c.networks[n.id]
c.Unlock()
if ok {
existing.Lock()
// Skip existing network update
if existing.dbIndex != n.dbIndex {
existing.dbIndex = n.dbIndex
existing.endpointCnt = n.endpointCnt
}
existing.Unlock()
continue
}
if err = c.newNetworkFromStore(&n); err != nil {
log.Error(err)
}
}
}
func (c *controller) processEndpointUpdate(ep *endpoint) bool {
nw := ep.network
if nw == nil {