Various refactor and fixes for the previous two commits including:

1. Don't save localscope endpoints to localstore for now.
2. Add common function updateToStore/deleteFromStore to store KVObjects.
3. Merge `getNetworksFromGlobalStore` and `getNetworksFromLocalStore`
4. Add `n.isGlobalScoped` before `n.watchEndpoints` in `addNetwork`
5. Fix integration-tests
6. Fix test failure in drivers/remote/driver_test.go
7. Restore network to store if deleteNework failed
This commit is contained in:
Chun Chen 2015-09-16 19:42:35 +08:00
parent 8babc3d4d3
commit 72567c355b
10 changed files with 167 additions and 98 deletions

View File

@ -256,7 +256,7 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
return nil, err return nil, err
} }
if err := c.updateNetworkToStore(network); err != nil { if err := c.updateToStore(network); err != nil {
log.Warnf("couldnt create network %s: %v", network.name, err) log.Warnf("couldnt create network %s: %v", network.name, err)
if e := network.Delete(); e != nil { if e := network.Delete(); e != nil {
log.Warnf("couldnt cleanup network %s: %v", network.name, err) log.Warnf("couldnt cleanup network %s: %v", network.name, err)
@ -293,8 +293,10 @@ func (c *controller) addNetwork(n *network) error {
if err := d.CreateNetwork(n.id, n.generic); err != nil { if err := d.CreateNetwork(n.id, n.generic); err != nil {
return err return err
} }
if err := n.watchEndpoints(); err != nil { if n.isGlobalScoped() {
return err if err := n.watchEndpoints(); err != nil {
return err
}
} }
c.Lock() c.Lock()
c.networks[n.id] = n c.networks[n.id] = n

View File

@ -29,9 +29,9 @@ func makeDriverConfig(c *controller, ntype string) map[string]interface{} {
config := make(map[string]interface{}) config := make(map[string]interface{})
if c.validateDatastoreConfig() { if c.validateGlobalStoreConfig() {
config[netlabel.KVProvider] = c.cfg.Datastore.Client.Provider config[netlabel.KVProvider] = c.cfg.GlobalStore.Client.Provider
config[netlabel.KVProviderURL] = c.cfg.Datastore.Client.Address config[netlabel.KVProviderURL] = c.cfg.GlobalStore.Client.Address
} }
for _, label := range c.cfg.Daemon.Labels { for _, label := range c.cfg.Daemon.Labels {

View File

@ -9,6 +9,7 @@ import (
"testing" "testing"
"github.com/docker/docker/pkg/plugins" "github.com/docker/docker/pkg/plugins"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/driverapi"
_ "github.com/docker/libnetwork/testutils" _ "github.com/docker/libnetwork/testutils"
"github.com/docker/libnetwork/types" "github.com/docker/libnetwork/types"
@ -205,8 +206,8 @@ func TestGetExtraCapabilities(t *testing.T) {
c, err := d.(*driver).getCapabilities() c, err := d.(*driver).getCapabilities()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} else if c.Scope != driverapi.LocalScope { } else if c.DataScope != datastore.LocalScope {
t.Fatalf("get capability '%s', expecting 'local'", c.Scope) t.Fatalf("get capability '%s', expecting 'local'", c.DataScope)
} }
} }
@ -343,8 +344,8 @@ func TestRemoteDriver(t *testing.T) {
c, err := d.(*driver).getCapabilities() c, err := d.(*driver).getCapabilities()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} else if c.Scope != driverapi.GlobalScope { } else if c.DataScope != datastore.GlobalScope {
t.Fatalf("get capability '%s', expecting 'global'", c.Scope) t.Fatalf("get capability '%s', expecting 'global'", c.DataScope)
} }
netID := "dummy-network" netID := "dummy-network"

View File

@ -282,8 +282,10 @@ func (ep *endpoint) Join(sbox Sandbox, options ...EndpointOption) error {
return err return err
} }
if err = network.ctrlr.updateEndpointToStore(ep); err != nil { if !ep.isLocalScoped() {
return err if err = network.ctrlr.updateToStore(ep); err != nil {
return err
}
} }
sb.Lock() sb.Lock()
@ -355,11 +357,13 @@ func (ep *endpoint) Leave(sbox Sandbox, options ...EndpointOption) error {
d := n.driver d := n.driver
n.Unlock() n.Unlock()
if err := c.updateEndpointToStore(ep); err != nil { if !ep.isLocalScoped() {
ep.Lock() if err := c.updateToStore(ep); err != nil {
ep.sandboxID = sid ep.Lock()
ep.Unlock() ep.sandboxID = sid
return err ep.Unlock()
return err
}
} }
if err := d.Leave(n.id, ep.id); err != nil { if err := d.Leave(n.id, ep.id); err != nil {
@ -395,27 +399,31 @@ func (ep *endpoint) Delete() error {
n.Unlock() n.Unlock()
ep.Unlock() ep.Unlock()
if err = ctrlr.deleteEndpointFromStore(ep); err != nil { if !ep.isLocalScoped() {
return err if err = ctrlr.deleteFromStore(ep); err != nil {
return err
}
} }
defer func() { defer func() {
if err != nil { if err != nil {
ep.SetIndex(0) ep.dbExists = false
if e := ctrlr.updateEndpointToStore(ep); e != nil { if !ep.isLocalScoped() {
log.Warnf("failed to recreate endpoint in store %s : %v", name, err) if e := ctrlr.updateToStore(ep); e != nil {
log.Warnf("failed to recreate endpoint in store %s : %v", name, e)
}
} }
} }
}() }()
// Update the endpoint count in network and update it in the datastore // Update the endpoint count in network and update it in the datastore
n.DecEndpointCnt() n.DecEndpointCnt()
if err = ctrlr.updateNetworkToStore(n); err != nil { if err = ctrlr.updateToStore(n); err != nil {
return err return err
} }
defer func() { defer func() {
if err != nil { if err != nil {
n.IncEndpointCnt() n.IncEndpointCnt()
if e := ctrlr.updateNetworkToStore(n); e != nil { if e := ctrlr.updateToStore(n); e != nil {
log.Warnf("failed to update network %s : %v", n.name, e) log.Warnf("failed to update network %s : %v", n.name, e)
} }
} }
@ -547,3 +555,7 @@ func (ep *endpoint) DataScope() datastore.DataScope {
defer ep.Unlock() defer ep.Unlock()
return ep.network.dataScope return ep.network.dataScope
} }
func (ep *endpoint) isLocalScoped() bool {
return ep.DataScope() == datastore.LocalScope
}

View File

@ -47,6 +47,7 @@ func TestMain(m *testing.M) {
} }
if err := createController(); err != nil { if err := createController(); err != nil {
log.Errorf("Error creating controller: %v", err)
os.Exit(1) os.Exit(1)
} }
@ -65,7 +66,11 @@ func createController() error {
genericOption := make(map[string]interface{}) genericOption := make(map[string]interface{})
genericOption[netlabel.GenericData] = option genericOption[netlabel.GenericData] = option
controller, err = libnetwork.New(config.OptionDriverConfig(bridgeNetType, genericOption)) cfgOptions, err := libnetwork.OptionBoltdbWithRandomDBFile()
if err != nil {
return err
}
controller, err = libnetwork.New(append(cfgOptions, config.OptionDriverConfig(bridgeNetType, genericOption))...)
if err != nil { if err != nil {
return err return err
} }

View File

@ -240,13 +240,22 @@ func (n *network) Delete() error {
// deleteNetworkFromStore performs an atomic delete operation and the network.endpointCnt field will help // deleteNetworkFromStore performs an atomic delete operation and the network.endpointCnt field will help
// prevent any possible race between endpoint join and network delete // prevent any possible race between endpoint join and network delete
if err = ctrlr.deleteNetworkFromStore(n); err != nil { if err = ctrlr.deleteFromStore(n); err != nil {
if err == datastore.ErrKeyModified { if err == datastore.ErrKeyModified {
return types.InternalErrorf("operation in progress. delete failed for network %s. Please try again.") return types.InternalErrorf("operation in progress. delete failed for network %s. Please try again.")
} }
return err return err
} }
defer func() {
if err != nil {
n.dbExists = false
if e := ctrlr.updateToStore(n); e != nil {
log.Warnf("failed to recreate network in store %s : %v", n.name, e)
}
}
}()
if err = n.deleteNetwork(); err != nil { if err = n.deleteNetwork(); err != nil {
return err return err
} }
@ -322,13 +331,13 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
n.Unlock() n.Unlock()
n.IncEndpointCnt() n.IncEndpointCnt()
if err = ctrlr.updateNetworkToStore(n); err != nil { if err = ctrlr.updateToStore(n); err != nil {
return nil, err return nil, err
} }
defer func() { defer func() {
if err != nil { if err != nil {
n.DecEndpointCnt() n.DecEndpointCnt()
if err = ctrlr.updateNetworkToStore(n); err != nil { if err = ctrlr.updateToStore(n); err != nil {
log.Warnf("endpoint count cleanup failed when updating network for %s : %v", name, err) log.Warnf("endpoint count cleanup failed when updating network for %s : %v", name, err)
} }
} }
@ -344,8 +353,10 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
} }
}() }()
if err = ctrlr.updateEndpointToStore(ep); err != nil { if !ep.isLocalScoped() {
return nil, err if err = ctrlr.updateToStore(ep); err != nil {
return nil, err
}
} }
return ep, nil return ep, nil

View File

@ -23,7 +23,11 @@ func getTestEnv(t *testing.T) (NetworkController, Network, Network) {
genericOption := make(map[string]interface{}) genericOption := make(map[string]interface{})
genericOption[netlabel.GenericData] = option genericOption[netlabel.GenericData] = option
c, err := New(config.OptionDriverConfig(netType, genericOption)) cfgOptions, err := OptionBoltdbWithRandomDBFile()
if err != nil {
t.Fatal(err)
}
c, err := New(append(cfgOptions, config.OptionDriverConfig(netType, genericOption))...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -43,7 +43,7 @@ func (c *controller) initGlobalStore() error {
c.globalStore = store c.globalStore = store
c.Unlock() c.Unlock()
nws, err := c.getNetworksFromGlobalStore() nws, err := c.getNetworksFromStore(true)
if err == nil { if err == nil {
c.processNetworkUpdate(nws, nil) c.processNetworkUpdate(nws, nil)
} else if err != datastore.ErrKeyNotFound { } else if err != datastore.ErrKeyNotFound {
@ -64,24 +64,23 @@ func (c *controller) initLocalStore() error {
c.localStore = localStore c.localStore = localStore
c.Unlock() c.Unlock()
nws, err := c.getNetworksFromLocalStore() nws, err := c.getNetworksFromStore(false)
if err == nil { if err == nil {
c.processNetworkUpdate(nws, nil) c.processNetworkUpdate(nws, nil)
} else if err != datastore.ErrKeyNotFound { } else if err != datastore.ErrKeyNotFound {
log.Warnf("failed to read networks from localstore during init : %v", err) log.Warnf("failed to read networks from localstore during init : %v", err)
} }
eps, err := c.getEndpointsFromLocalStore()
if err == nil {
c.processEndpointsUpdate(eps, nil)
} else if err != datastore.ErrKeyNotFound {
log.Warnf("failed to read endpoints from localstore during init : %v", err)
}
return nil return nil
} }
func (c *controller) getNetworksFromGlobalStore() ([]*store.KVPair, error) { func (c *controller) getNetworksFromStore(global bool) ([]*store.KVPair, error) {
var cs datastore.DataStore
c.Lock() c.Lock()
cs := c.globalStore if global {
cs = c.globalStore
} else {
cs = c.localStore
}
c.Unlock() c.Unlock()
return cs.KVStore().List(datastore.Key(datastore.NetworkKeyPrefix)) return cs.KVStore().List(datastore.Key(datastore.NetworkKeyPrefix))
} }
@ -95,30 +94,6 @@ func (c *controller) newNetworkFromStore(n *network) error {
return c.addNetwork(n) return c.addNetwork(n)
} }
func (c *controller) updateNetworkToStore(n *network) error {
cs := c.getDataStore(n.DataScope())
if cs == nil {
log.Debugf("datastore not initialized. Network %s is not added to the store", n.Name())
return nil
}
return cs.PutObjectAtomic(n)
}
func (c *controller) deleteNetworkFromStore(n *network) error {
cs := c.getDataStore(n.DataScope())
if cs == nil {
log.Debugf("datastore not initialized. Network %s is not deleted from datastore", n.Name())
return nil
}
if err := cs.DeleteObjectAtomic(n); err != nil {
return err
}
return nil
}
func (c *controller) newEndpointFromStore(key string, ep *endpoint) error { func (c *controller) newEndpointFromStore(key string, ep *endpoint) error {
ep.Lock() ep.Lock()
n := ep.network n := ep.network
@ -134,27 +109,24 @@ func (c *controller) newEndpointFromStore(key string, ep *endpoint) error {
return err return err
} }
func (c *controller) updateEndpointToStore(ep *endpoint) error { func (c *controller) updateToStore(kvObject datastore.KV) error {
ep.Lock() cs := c.getDataStore(kvObject.DataScope())
name := ep.name
ep.Unlock()
cs := c.getDataStore(ep.DataScope())
if cs == nil { if cs == nil {
log.Debugf("datastore not initialized. endpoint %s is not added to the store", name) log.Debugf("datastore not initialized. kv object %s is not added to the store", datastore.Key(kvObject.Key()...))
return nil return nil
} }
return cs.PutObjectAtomic(ep) return cs.PutObjectAtomic(kvObject)
} }
func (c *controller) deleteEndpointFromStore(ep *endpoint) error { func (c *controller) deleteFromStore(kvObject datastore.KV) error {
cs := c.getDataStore(ep.DataScope()) cs := c.getDataStore(kvObject.DataScope())
if cs == nil { if cs == nil {
log.Debugf("datastore not initialized. endpoint %s is not deleted from datastore", ep.Name()) log.Debugf("datastore not initialized. kv object %s is not deleted from datastore", datastore.Key(kvObject.Key()...))
return nil return nil
} }
if err := cs.DeleteObjectAtomic(ep); err != nil { if err := cs.DeleteObjectAtomic(kvObject); err != nil {
return err return err
} }
@ -367,13 +339,6 @@ func (c *controller) getLocalStoreConfig(cfg *config.Config) *config.DatastoreCf
return &defaultLocalStoreConfig return &defaultLocalStoreConfig
} }
func (c *controller) getNetworksFromLocalStore() ([]*store.KVPair, error) {
c.Lock()
cs := c.localStore
c.Unlock()
return cs.KVStore().List(datastore.Key(datastore.NetworkKeyPrefix))
}
func (c *controller) getDataStore(dataScope datastore.DataScope) (dataStore datastore.DataStore) { func (c *controller) getDataStore(dataScope datastore.DataScope) (dataStore datastore.DataStore) {
c.Lock() c.Lock()
if dataScope == datastore.GlobalScope { if dataScope == datastore.GlobalScope {
@ -385,13 +350,6 @@ func (c *controller) getDataStore(dataScope datastore.DataScope) (dataStore data
return return
} }
func (c *controller) getEndpointsFromLocalStore() ([]*store.KVPair, error) {
c.Lock()
cs := c.localStore
c.Unlock()
return cs.KVStore().List(datastore.Key(datastore.EndpointKeyPrefix))
}
func (c *controller) processEndpointsUpdate(eps []*store.KVPair, prune *endpointTable) { func (c *controller) processEndpointsUpdate(eps []*store.KVPair, prune *endpointTable) {
for _, epe := range eps { for _, epe := range eps {
var ep endpoint var ep endpoint

View File

@ -1,20 +1,96 @@
package libnetwork package libnetwork
import ( import (
"fmt"
"io/ioutil"
"os"
"testing" "testing"
"github.com/docker/libkv/store"
"github.com/docker/libnetwork/config" "github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/options"
) )
func TestZooKeeperBackend(t *testing.T) { func TestZooKeeperBackend(t *testing.T) {
testNewController(t, "zk", "127.0.0.1:2181") if err := testNewController(t, "zk", "127.0.0.1:2181"); err != nil {
t.Fatal(err)
}
} }
func testNewController(t *testing.T, provider, url string) error { func testNewController(t *testing.T, provider, url string) error {
netOptions := []config.Option{} cfgOptions, err := OptionBoltdbWithRandomDBFile()
netOptions = append(netOptions, config.OptionKVProvider(provider)) if err != nil {
netOptions = append(netOptions, config.OptionKVProviderURL(url)) return err
}
_, err := New(netOptions...) cfgOptions = append(cfgOptions, config.OptionKVProvider(provider))
cfgOptions = append(cfgOptions, config.OptionKVProviderURL(url))
_, err = New(cfgOptions...)
return err return err
} }
func TestBoltdbBackend(t *testing.T) {
defer os.Remove(defaultLocalStoreConfig.Client.Address)
testLocalBackend(t, "", "", nil)
defer os.Remove("/tmp/boltdb.db")
testLocalBackend(t, "boltdb", "/tmp/boltdb.db", &store.Config{Bucket: "testBackend"})
}
func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Config) {
cfgOptions := []config.Option{}
cfgOptions = append(cfgOptions, config.OptionLocalKVProvider(provider))
cfgOptions = append(cfgOptions, config.OptionLocalKVProviderURL(url))
cfgOptions = append(cfgOptions, config.OptionLocalKVProviderConfig(storeConfig))
driverOptions := options.Generic{}
genericOption := make(map[string]interface{})
genericOption[netlabel.GenericData] = driverOptions
cfgOptions = append(cfgOptions, config.OptionDriverConfig("host", genericOption))
ctrl, err := New(cfgOptions...)
if err != nil {
t.Fatalf("Error new controller: %v", err)
}
nw, err := ctrl.NewNetwork("host", "host")
if err != nil {
t.Fatalf("Error creating default \"host\" network: %v", err)
}
ep, err := nw.CreateEndpoint("newendpoint", []EndpointOption{}...)
if err != nil {
t.Fatalf("Error creating endpoint: %v", err)
}
store := ctrl.(*controller).localStore.KVStore()
if exists, err := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, string(nw.ID()))); !exists || err != nil {
t.Fatalf("Network key should have been created.")
}
if exists, err := store.Exists(datastore.Key([]string{datastore.EndpointKeyPrefix, string(nw.ID()), string(ep.ID())}...)); exists || err != nil {
t.Fatalf("Endpoint key shouldn't have been created.")
}
store.Close()
// test restore of local store
ctrl, err = New(cfgOptions...)
if err != nil {
t.Fatalf("Error creating controller: %v", err)
}
if _, err = ctrl.NetworkByID(nw.ID()); err != nil {
t.Fatalf("Error getting network %v", err)
}
}
// OptionBoltdbWithRandomDBFile function returns a random dir for local store backend
func OptionBoltdbWithRandomDBFile() ([]config.Option, error) {
tmp, err := ioutil.TempFile("", "libnetwork-")
if err != nil {
return nil, fmt.Errorf("Error creating temp file: %v", err)
}
if err := tmp.Close(); err != nil {
return nil, fmt.Errorf("Error closing temp file: %v", err)
}
cfgOptions := []config.Option{}
cfgOptions = append(cfgOptions, config.OptionLocalKVProvider("boltdb"))
cfgOptions = append(cfgOptions, config.OptionLocalKVProviderURL(tmp.Name()))
cfgOptions = append(cfgOptions, config.OptionLocalKVProviderConfig(&store.Config{Bucket: "testBackend"}))
return cfgOptions, nil
}

View File

@ -50,9 +50,9 @@ title = "LibNetwork Configuration file"
[daemon] [daemon]
debug = false debug = false
labels = ["com.docker.network.driver.overlay.bind_interface=eth0"] labels = ["com.docker.network.driver.overlay.bind_interface=eth0"]
[datastore] [globalstore]
embedded = false embedded = false
[datastore.client] [globalstore.client]
provider = "consul" provider = "consul"
Address = "${bridge_ip}:8500" Address = "${bridge_ip}:8500"
EOF EOF