Merge pull request #594 from mrjana/model

Remove watch on all libnetwork objects
This commit is contained in:
Madhu Venugopal 2015-10-06 14:46:48 -07:00
commit f77bdb6589
33 changed files with 2170 additions and 1161 deletions

View File

@ -14,6 +14,7 @@ import (
"github.com/docker/docker/pkg/reexec"
"github.com/docker/libnetwork"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/options"
"github.com/docker/libnetwork/testutils"
@ -88,11 +89,13 @@ func i2sbL(i interface{}) []*sandboxResource {
}
func createTestNetwork(t *testing.T, network string) (libnetwork.NetworkController, libnetwork.Network) {
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
}
defer c.Stop()
netOption := options.Generic{
netlabel.GenericData: options.Generic{
@ -175,6 +178,9 @@ func TestJson(t *testing.T) {
func TestCreateDeleteNetwork(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -249,6 +255,9 @@ func TestCreateDeleteNetwork(t *testing.T) {
func TestGetNetworksAndEndpoints(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -518,6 +527,9 @@ func TestGetNetworksAndEndpoints(t *testing.T) {
func TestProcGetServices(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -686,6 +698,7 @@ func TestProcGetService(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
c, nw := createTestNetwork(t, "network")
defer c.Stop()
ep1, err := nw.CreateEndpoint("db")
if err != nil {
t.Fatal(err)
@ -738,6 +751,8 @@ func TestProcPublishUnpublishService(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
c, _ := createTestNetwork(t, "network")
defer c.Stop()
vars := make(map[string]string)
vbad, err := json.Marshal("bad service create data")
@ -870,6 +885,7 @@ func TestAttachDetachBackend(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
c, nw := createTestNetwork(t, "network")
defer c.Stop()
ep1, err := nw.CreateEndpoint("db")
if err != nil {
t.Fatal(err)
@ -994,6 +1010,9 @@ func TestAttachDetachBackend(t *testing.T) {
}
func TestDetectGetNetworksInvalidQueryComposition(t *testing.T) {
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -1011,6 +1030,7 @@ func TestDetectGetEndpointsInvalidQueryComposition(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
c, _ := createTestNetwork(t, "network")
defer c.Stop()
vars := map[string]string{urlNwName: "network", urlEpName: "x", urlEpPID: "y"}
_, errRsp := procGetEndpoints(c, vars, nil)
@ -1023,6 +1043,7 @@ func TestDetectGetServicesInvalidQueryComposition(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
c, _ := createTestNetwork(t, "network")
defer c.Stop()
vars := map[string]string{urlNwName: "network", urlEpName: "x", urlEpPID: "y"}
_, errRsp := procGetServices(c, vars, nil)
@ -1040,6 +1061,8 @@ func TestFindNetworkUtil(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
c, nw := createTestNetwork(t, "network")
defer c.Stop()
nid := nw.ID()
_, errRsp := findNetwork(c, "", byName)
@ -1102,6 +1125,9 @@ func TestFindNetworkUtil(t *testing.T) {
func TestCreateDeleteEndpoints(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -1225,6 +1251,9 @@ func TestCreateDeleteEndpoints(t *testing.T) {
func TestJoinLeave(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -1382,6 +1411,8 @@ func TestFindEndpointUtilPanic(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
defer checkPanic(t)
c, nw := createTestNetwork(t, "network")
defer c.Stop()
nid := nw.ID()
findEndpoint(c, nid, "", byID, -1)
}
@ -1390,6 +1421,8 @@ func TestFindServiceUtilPanic(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
defer checkPanic(t)
c, _ := createTestNetwork(t, "network")
defer c.Stop()
findService(c, "random_service", -1)
}
@ -1397,6 +1430,8 @@ func TestFindEndpointUtil(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
c, nw := createTestNetwork(t, "network")
defer c.Stop()
nid := nw.ID()
ep, err := nw.CreateEndpoint("secondEp", nil)
@ -1443,7 +1478,8 @@ func TestFindEndpointUtil(t *testing.T) {
t.Fatalf("Unexepected failure: %v", errRsp)
}
if ep0 != ep1 || ep0 != ep2 || ep0 != ep3 || ep0 != ep4 || ep0 != ep5 {
if ep0.ID() != ep1.ID() || ep0.ID() != ep2.ID() ||
ep0.ID() != ep3.ID() || ep0.ID() != ep4.ID() || ep0.ID() != ep5.ID() {
t.Fatalf("Diffenrent queries returned different endpoints")
}
@ -1665,6 +1701,9 @@ func TestwriteJSON(t *testing.T) {
func TestHttpHandlerUninit(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -1732,6 +1771,9 @@ func TestHttpHandlerBadBody(t *testing.T) {
rsp := newWriter()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -1765,6 +1807,9 @@ func TestEndToEnd(t *testing.T) {
rsp := newWriter()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -2213,6 +2258,9 @@ func TestEndToEndErrorMessage(t *testing.T) {
rsp := newWriter()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)

View File

@ -57,9 +57,6 @@ func NewHandle(app string, ds datastore.DataStore, id string, numElements uint32
return h, nil
}
// Register for status changes
h.watchForChanges()
// Get the initial status from the ds if present.
if err := h.store.GetObject(datastore.Key(h.Key()...), h); err != nil && err != datastore.ErrKeyNotFound {
return nil, err
@ -252,6 +249,12 @@ func (h *Handle) set(ordinal, start, end uint32, any bool, release bool) (uint32
)
for {
if h.store != nil {
if err := h.store.GetObject(datastore.Key(h.Key()...), h); err != nil && err != datastore.ErrKeyNotFound {
return ret, err
}
}
h.Lock()
// Get position if available
if release {

View File

@ -70,46 +70,47 @@ func (h *Handle) Exists() bool {
return h.dbExists
}
// New method returns a handle based on the receiver handle
func (h *Handle) New() datastore.KVObject {
h.Lock()
defer h.Unlock()
return &Handle{
app: h.app,
id: h.id,
store: h.store,
}
}
// CopyTo deep copies the handle into the passed destination object
func (h *Handle) CopyTo(o datastore.KVObject) error {
h.Lock()
defer h.Unlock()
dstH := o.(*Handle)
dstH.bits = h.bits
dstH.unselected = h.unselected
dstH.head = h.head.getCopy()
dstH.app = h.app
dstH.id = h.id
dstH.dbIndex = h.dbIndex
dstH.dbExists = h.dbExists
dstH.store = h.store
return nil
}
// Skip provides a way for a KV Object to avoid persisting it in the KV Store
func (h *Handle) Skip() bool {
return false
}
// DataScope method returns the storage scope of the datastore
func (h *Handle) DataScope() datastore.DataScope {
return datastore.GlobalScope
}
func (h *Handle) watchForChanges() error {
func (h *Handle) DataScope() string {
h.Lock()
store := h.store
h.Unlock()
defer h.Unlock()
if store == nil {
return nil
}
kvpChan, err := store.KVStore().Watch(datastore.Key(h.Key()...), nil)
if err != nil {
return err
}
go func() {
for {
select {
case kvPair := <-kvpChan:
// Only process remote update
if kvPair != nil && (kvPair.LastIndex != h.Index()) {
err := h.fromDsValue(kvPair.Value)
if err != nil {
log.Warnf("Failed to reconstruct bitseq handle from ds watch: %s", err.Error())
} else {
h.SetIndex(kvPair.LastIndex)
}
}
}
}
}()
return nil
return h.store.Scope()
}
func (h *Handle) fromDsValue(value []byte) error {

View File

@ -22,10 +22,12 @@ import (
"github.com/docker/docker/pkg/reexec"
"github.com/Sirupsen/logrus"
psignal "github.com/docker/docker/pkg/signal"
"github.com/docker/docker/pkg/term"
"github.com/docker/libnetwork"
"github.com/docker/libnetwork/api"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/options"
@ -76,6 +78,7 @@ func processConfig(cfg *config.Config) []config.Option {
if cfg == nil {
return options
}
dn := "bridge"
if strings.TrimSpace(cfg.Daemon.DefaultNetwork) != "" {
dn = cfg.Daemon.DefaultNetwork
@ -91,12 +94,12 @@ func processConfig(cfg *config.Config) []config.Option {
if cfg.Daemon.Labels != nil {
options = append(options, config.OptionLabels(cfg.Daemon.Labels))
}
if strings.TrimSpace(cfg.GlobalStore.Client.Provider) != "" {
options = append(options, config.OptionKVProvider(cfg.GlobalStore.Client.Provider))
}
if strings.TrimSpace(cfg.GlobalStore.Client.Address) != "" {
options = append(options, config.OptionKVProviderURL(cfg.GlobalStore.Client.Address))
if dcfg, ok := cfg.Scopes[datastore.GlobalScope]; ok && dcfg.IsValid() {
options = append(options, config.OptionKVProvider(dcfg.Client.Provider))
options = append(options, config.OptionKVProviderURL(dcfg.Client.Address))
}
dOptions, err := startDiscovery(&cfg.Cluster)
if err != nil {
logrus.Infof("Skipping discovery : %s", err.Error())
@ -182,8 +185,9 @@ func createDefaultNetwork(c libnetwork.NetworkController) {
genericOption[netlabel.GenericData] = map[string]interface{}{
"BridgeName": nw,
}
networkOption := libnetwork.NetworkOptionGeneric(genericOption)
createOptions = append(createOptions, networkOption)
createOptions = append(createOptions,
libnetwork.NetworkOptionGeneric(genericOption),
libnetwork.NetworkOptionPersist(false))
}
_, err := c.NewNetwork(d, nw, createOptions...)
if err != nil {
@ -214,6 +218,7 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error {
fmt.Println("Error starting dnetDaemon :", err)
return err
}
createDefaultNetwork(controller)
httpHandler := api.NewHTTPHandler(controller)
r := mux.NewRouter().StrictSlash(false)
@ -231,10 +236,21 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error {
post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
handleSignals(controller)
setupDumpStackTrap()
return http.ListenAndServe(d.addr, r)
}
func setupDumpStackTrap() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGUSR1)
go func() {
for range c {
psignal.DumpStacks()
}
}()
}
func handleSignals(controller libnetwork.NetworkController) {
c := make(chan os.Signal, 1)
signals := []os.Signal{os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT}

View File

@ -7,19 +7,21 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/discovery"
"github.com/docker/libkv/store"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/netlabel"
)
// Config encapsulates configurations of various Libnetwork components
type Config struct {
Daemon DaemonCfg
Cluster ClusterCfg
GlobalStore, LocalStore DatastoreCfg
Daemon DaemonCfg
Cluster ClusterCfg
Scopes map[string]*datastore.ScopeCfg
}
// DaemonCfg represents libnetwork core configuration
type DaemonCfg struct {
Debug bool
DataDir string
DefaultNetwork string
DefaultDriver string
Labels []string
@ -34,26 +36,28 @@ type ClusterCfg struct {
Heartbeat uint64
}
// DatastoreCfg represents Datastore configuration.
type DatastoreCfg struct {
Embedded bool
Client DatastoreClientCfg
}
// DatastoreClientCfg represents Datastore Client-only mode configuration
type DatastoreClientCfg struct {
Provider string
Address string
Config *store.Config
// LoadDefaultScopes loads default scope configs for scopes which
// doesn't have explicit user specified configs.
func (c *Config) LoadDefaultScopes(dataDir string) {
for k, v := range datastore.DefaultScopes(dataDir) {
if _, ok := c.Scopes[k]; !ok {
c.Scopes[k] = v
}
}
}
// ParseConfig parses the libnetwork configuration file
func ParseConfig(tomlCfgFile string) (*Config, error) {
var cfg Config
if _, err := toml.DecodeFile(tomlCfgFile, &cfg); err != nil {
cfg := &Config{
Scopes: map[string]*datastore.ScopeCfg{},
}
if _, err := toml.DecodeFile(tomlCfgFile, cfg); err != nil {
return nil, err
}
return &cfg, nil
cfg.LoadDefaultScopes(cfg.Daemon.DataDir)
return cfg, nil
}
// Option is a option setter function type used to pass varios configurations
@ -98,7 +102,10 @@ func OptionLabels(labels []string) Option {
func OptionKVProvider(provider string) Option {
return func(c *Config) {
log.Infof("Option OptionKVProvider: %s", provider)
c.GlobalStore.Client.Provider = strings.TrimSpace(provider)
if _, ok := c.Scopes[datastore.GlobalScope]; !ok {
c.Scopes[datastore.GlobalScope] = &datastore.ScopeCfg{}
}
c.Scopes[datastore.GlobalScope].Client.Provider = strings.TrimSpace(provider)
}
}
@ -106,7 +113,10 @@ func OptionKVProvider(provider string) Option {
func OptionKVProviderURL(url string) Option {
return func(c *Config) {
log.Infof("Option OptionKVProviderURL: %s", url)
c.GlobalStore.Client.Address = strings.TrimSpace(url)
if _, ok := c.Scopes[datastore.GlobalScope]; !ok {
c.Scopes[datastore.GlobalScope] = &datastore.ScopeCfg{}
}
c.Scopes[datastore.GlobalScope].Client.Address = strings.TrimSpace(url)
}
}
@ -124,6 +134,13 @@ func OptionDiscoveryAddress(address string) Option {
}
}
// OptionDataDir function returns an option setter for data folder
func OptionDataDir(dataDir string) Option {
return func(c *Config) {
c.Daemon.DataDir = dataDir
}
}
// ProcessOptions processes options and stores it in config
func (c *Config) ProcessOptions(options ...Option) {
for _, opt := range options {
@ -145,7 +162,10 @@ func IsValidName(name string) bool {
func OptionLocalKVProvider(provider string) Option {
return func(c *Config) {
log.Infof("Option OptionLocalKVProvider: %s", provider)
c.LocalStore.Client.Provider = strings.TrimSpace(provider)
if _, ok := c.Scopes[datastore.LocalScope]; !ok {
c.Scopes[datastore.LocalScope] = &datastore.ScopeCfg{}
}
c.Scopes[datastore.LocalScope].Client.Provider = strings.TrimSpace(provider)
}
}
@ -153,7 +173,10 @@ func OptionLocalKVProvider(provider string) Option {
func OptionLocalKVProviderURL(url string) Option {
return func(c *Config) {
log.Infof("Option OptionLocalKVProviderURL: %s", url)
c.LocalStore.Client.Address = strings.TrimSpace(url)
if _, ok := c.Scopes[datastore.LocalScope]; !ok {
c.Scopes[datastore.LocalScope] = &datastore.ScopeCfg{}
}
c.Scopes[datastore.LocalScope].Client.Address = strings.TrimSpace(url)
}
}
@ -161,6 +184,9 @@ func OptionLocalKVProviderURL(url string) Option {
func OptionLocalKVProviderConfig(config *store.Config) Option {
return func(c *Config) {
log.Infof("Option OptionLocalKVProviderConfig: %v", config)
c.LocalStore.Client.Config = config
if _, ok := c.Scopes[datastore.LocalScope]; !ok {
c.Scopes[datastore.LocalScope] = &datastore.ScopeCfg{}
}
c.Scopes[datastore.LocalScope].Client.Config = config
}
}

View File

@ -124,73 +124,71 @@ type ipamData struct {
}
type driverTable map[string]*driverData
//type networkTable map[string]*network
//type endpointTable map[string]*endpoint
type ipamTable map[string]*ipamData
type networkTable map[string]*network
type endpointTable map[string]*endpoint
type sandboxTable map[string]*sandbox
type controller struct {
id string
networks networkTable
drivers driverTable
ipamDrivers ipamTable
sandboxes sandboxTable
cfg *config.Config
globalStore, localStore datastore.DataStore
discovery hostdiscovery.HostDiscovery
extKeyListener net.Listener
id string
//networks networkTable
drivers driverTable
ipamDrivers ipamTable
sandboxes sandboxTable
cfg *config.Config
stores []datastore.DataStore
discovery hostdiscovery.HostDiscovery
extKeyListener net.Listener
watchCh chan *endpoint
unWatchCh chan *endpoint
svcDb map[string]svcMap
sync.Mutex
}
// New creates a new instance of network controller.
func New(cfgOptions ...config.Option) (NetworkController, error) {
var cfg *config.Config
cfg = &config.Config{
Daemon: config.DaemonCfg{
DriverCfg: make(map[string]interface{}),
},
Scopes: make(map[string]*datastore.ScopeCfg),
}
if len(cfgOptions) > 0 {
cfg = &config.Config{
Daemon: config.DaemonCfg{
DriverCfg: make(map[string]interface{}),
},
}
cfg.ProcessOptions(cfgOptions...)
}
cfg.LoadDefaultScopes(cfg.Daemon.DataDir)
c := &controller{
id: stringid.GenerateRandomID(),
cfg: cfg,
networks: networkTable{},
sandboxes: sandboxTable{},
drivers: driverTable{},
ipamDrivers: ipamTable{}}
if err := initDrivers(c); err != nil {
ipamDrivers: ipamTable{},
svcDb: make(map[string]svcMap),
}
if err := c.initStores(); err != nil {
return nil, err
}
if cfg != nil {
if err := c.initGlobalStore(); err != nil {
// Failing to initalize datastore is a bad situation to be in.
// But it cannot fail creating the Controller
log.Debugf("Failed to Initialize Datastore due to %v. Operating in non-clustered mode", err)
}
if err := c.initLocalStore(); err != nil {
log.Debugf("Failed to Initialize LocalDatastore due to %v.", err)
}
}
if err := initIpams(c, c.localStore, c.globalStore); err != nil {
return nil, err
}
if cfg != nil {
if err := c.restoreFromGlobalStore(); err != nil {
log.Debugf("Failed to restore from global Datastore due to %v", err)
}
if cfg != nil && cfg.Cluster.Watcher != nil {
if err := c.initDiscovery(cfg.Cluster.Watcher); err != nil {
// Failing to initalize discovery is a bad situation to be in.
// But it cannot fail creating the Controller
log.Debugf("Failed to Initialize Discovery : %v", err)
}
if err := c.restoreFromLocalStore(); err != nil {
log.Debugf("Failed to restore from local Datastore due to %v", err)
}
}
if err := initDrivers(c); err != nil {
return nil, err
}
if err := initIpams(c, c.getStore(datastore.LocalScope),
c.getStore(datastore.GlobalScope)); err != nil {
return nil, err
}
if err := c.startExternalKeyListener(); err != nil {
@ -325,15 +323,6 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
if !config.IsValidName(name) {
return nil, ErrInvalidName(name)
}
// Check if a network already exists with the specified network name
c.Lock()
for _, n := range c.networks {
if n.name == name {
c.Unlock()
return nil, NetworkNameError(name)
}
}
c.Unlock()
// Construct the network object
network := &network{
@ -342,13 +331,15 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
ipamType: ipamapi.DefaultIPAM,
id: stringid.GenerateRandomID(),
ctrlr: c,
endpoints: endpointTable{},
persist: true,
drvOnce: &sync.Once{},
}
network.processOptions(options...)
if _, err := c.loadNetworkDriver(network); err != nil {
// Make sure we have a driver available for this network type
// before we allocate anything.
if _, err := network.driver(); err != nil {
return nil, err
}
@ -364,7 +355,16 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
}
}()
if err = c.addNetwork(network); err != nil {
// addNetwork can be called for local scope network lazily when
// an endpoint is created after a restart and the network was
// created in previous life. Make sure you wrap around the driver
// notification of network creation in once call so that the driver
// invoked only once in case both the network and endpoint creation
// happens in the same lifetime.
network.drvOnce.Do(func() {
err = c.addNetwork(network)
})
if err != nil {
return nil, err
}
@ -380,35 +380,28 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
}
func (c *controller) addNetwork(n *network) error {
if _, err := c.loadNetworkDriver(n); err != nil {
d, err := n.driver()
if err != nil {
return err
}
n.Lock()
d := n.driver
n.Unlock()
// Create the network
if err := d.CreateNetwork(n.id, n.generic, n.getIPv4Data(), n.getIPv6Data()); err != nil {
return err
}
if n.isGlobalScoped() {
if err := n.watchEndpoints(); err != nil {
return err
}
}
c.Lock()
c.networks[n.id] = n
c.Unlock()
return nil
}
func (c *controller) Networks() []Network {
c.Lock()
defer c.Unlock()
var list []Network
list := make([]Network, 0, len(c.networks))
for _, n := range c.networks {
networks, err := c.getNetworksFromStore()
if err != nil {
log.Error(err)
}
for _, n := range networks {
list = append(list, n)
}
@ -450,12 +443,13 @@ func (c *controller) NetworkByID(id string) (Network, error) {
if id == "" {
return nil, ErrInvalidID(id)
}
c.Lock()
defer c.Unlock()
if n, ok := c.networks[id]; ok {
return n, nil
n, err := c.getNetworkFromStore(id)
if err != nil {
return nil, ErrNoSuchNetwork(id)
}
return nil, ErrNoSuchNetwork(id)
return n, nil
}
// NewSandbox creates a new sandbox for the passed container id
@ -620,30 +614,7 @@ func (c *controller) getIpamDriver(name string) (ipamapi.Ipam, error) {
}
func (c *controller) Stop() {
if c.localStore != nil {
c.localStore.KVStore().Close()
}
c.closeStores()
c.stopExternalKeyListener()
osl.GC()
}
func (c *controller) loadNetworkDriver(n *network) (driverapi.Driver, error) {
// Check if a driver for the specified network type is available
c.Lock()
dd, ok := c.drivers[n.networkType]
c.Unlock()
if !ok {
var err error
dd, err = c.loadDriver(n.networkType)
if err != nil {
return nil, err
}
}
n.Lock()
n.svcRecords = svcMap{}
n.driver = dd.driver
n.dataScope = dd.capability.DataScope
n.Unlock()
return dd.driver, nil
}

View File

@ -0,0 +1,153 @@
package datastore
import (
"fmt"
"sync"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/boltdb"
)
type kvMap map[string]KVObject
type cache struct {
sync.Mutex
kmm map[string]kvMap
ds *datastore
}
func newCache(ds *datastore) *cache {
return &cache{kmm: make(map[string]kvMap), ds: ds}
}
func (c *cache) kmap(kvObject KVObject) (kvMap, error) {
var err error
c.Lock()
keyPrefix := Key(kvObject.KeyPrefix()...)
kmap, ok := c.kmm[keyPrefix]
c.Unlock()
if ok {
return kmap, nil
}
kmap = kvMap{}
// Bail out right away if the kvObject does not implement KVConstructor
ctor, ok := kvObject.(KVConstructor)
if !ok {
return nil, fmt.Errorf("error while populating kmap, object does not implement KVConstructor interface")
}
kvList, err := c.ds.store.List(keyPrefix)
if err != nil {
// In case of BoltDB it may return ErrBoltBucketNotFound when no writes
// have ever happened on the db bucket. So check for both err codes
if err == store.ErrKeyNotFound || err == boltdb.ErrBoltBucketNotFound {
// If the store doesn't have anything then there is nothing to
// populate in the cache. Just bail out.
goto out
}
return nil, fmt.Errorf("error while populating kmap: %v", err)
}
for _, kvPair := range kvList {
// Ignore empty kvPair values
if len(kvPair.Value) == 0 {
continue
}
dstO := ctor.New()
err = dstO.SetValue(kvPair.Value)
if err != nil {
return nil, err
}
// Make sure the object has a correct view of the DB index in
// case we need to modify it and update the DB.
dstO.SetIndex(kvPair.LastIndex)
kmap[Key(dstO.Key()...)] = dstO
}
out:
// There may multiple go routines racing to fill the
// cache. The one which places the kmap in c.kmm first
// wins. The others should just use what the first populated.
c.Lock()
kmapNew, ok := c.kmm[keyPrefix]
if ok {
c.Unlock()
return kmapNew, nil
}
c.kmm[keyPrefix] = kmap
c.Unlock()
return kmap, nil
}
func (c *cache) add(kvObject KVObject) error {
kmap, err := c.kmap(kvObject)
if err != nil {
return err
}
c.Lock()
kmap[Key(kvObject.Key()...)] = kvObject
c.Unlock()
return nil
}
func (c *cache) del(kvObject KVObject) error {
kmap, err := c.kmap(kvObject)
if err != nil {
return err
}
c.Lock()
delete(kmap, Key(kvObject.Key()...))
c.Unlock()
return nil
}
func (c *cache) get(key string, kvObject KVObject) error {
kmap, err := c.kmap(kvObject)
if err != nil {
return err
}
c.Lock()
defer c.Unlock()
o, ok := kmap[Key(kvObject.Key()...)]
if !ok {
return ErrKeyNotFound
}
ctor, ok := o.(KVConstructor)
if !ok {
return fmt.Errorf("kvobject does not implement KVConstructor interface. could not get object")
}
return ctor.CopyTo(kvObject)
}
func (c *cache) list(kvObject KVObject) ([]KVObject, error) {
kmap, err := c.kmap(kvObject)
if err != nil {
return nil, err
}
c.Lock()
defer c.Unlock()
var kvol []KVObject
for _, v := range kmap {
kvol = append(kvol, v)
}
return kvol, nil
}

View File

@ -1,8 +1,11 @@
package datastore
import (
"fmt"
"log"
"reflect"
"strings"
"time"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
@ -10,26 +13,37 @@ import (
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
"github.com/docker/libkv/store/zookeeper"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/types"
)
//DataStore exported
type DataStore interface {
// GetObject gets data from datastore and unmarshals to the specified object
GetObject(key string, o KV) error
GetObject(key string, o KVObject) error
// PutObject adds a new Record based on an object into the datastore
PutObject(kvObject KV) error
PutObject(kvObject KVObject) error
// PutObjectAtomic provides an atomic add and update operation for a Record
PutObjectAtomic(kvObject KV) error
PutObjectAtomic(kvObject KVObject) error
// DeleteObject deletes a record
DeleteObject(kvObject KV) error
DeleteObject(kvObject KVObject) error
// DeleteObjectAtomic performs an atomic delete operation
DeleteObjectAtomic(kvObject KV) error
DeleteObjectAtomic(kvObject KVObject) error
// DeleteTree deletes a record
DeleteTree(kvObject KV) error
DeleteTree(kvObject KVObject) error
// Watchable returns whether the store is watchable are not
Watchable() bool
// Watch for changes on a KVObject
Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error)
// List returns of a list of KVObjects belonging to the parent
// key. The caller must pass a KVObject of the same type as
// the objects that need to be listed
List(string, KVObject) ([]KVObject, error)
// Scope returns the scope of the store
Scope() string
// KVStore returns access to the KV Store
KVStore() store.Store
// Close closes the data store
Close()
}
// ErrKeyModified is raised for an atomic update when the update is working on a stale state
@ -39,11 +53,13 @@ var (
)
type datastore struct {
scope string
store store.Store
cache *cache
}
//KV Key Value interface used by objects to be part of the DataStore
type KV interface {
// KVObject is Key/Value interface used by objects to be part of the DataStore
type KVObject interface {
// Key method lets an object to provide the Key to be used in KV Store
Key() []string
// KeyPrefix method lets an object to return immediate parent key that can be used for tree walk
@ -60,19 +76,40 @@ type KV interface {
// When SetIndex() is called, the object has been stored.
Exists() bool
// DataScope indicates the storage scope of the KV object
DataScope() DataScope
DataScope() string
// Skip provides a way for a KV Object to avoid persisting it in the KV Store
Skip() bool
}
// DataScope indicates the storage scope
type DataScope int
// KVConstructor interface defines methods which can construct a KVObject from another.
type KVConstructor interface {
// New returns a new object which is created based on the
// source object
New() KVObject
// CopyTo deep copies the contents of the implementing object
// to the passed destination object
CopyTo(KVObject) error
}
// ScopeCfg represents Datastore configuration.
type ScopeCfg struct {
Embedded bool
Client ScopeClientCfg
}
// ScopeClientCfg represents Datastore Client-only mode configuration
type ScopeClientCfg struct {
Provider string
Address string
Config *store.Config
}
const (
// LocalScope indicates to store the KV object in local datastore such as boltdb
LocalScope DataScope = iota
LocalScope = "local"
// GlobalScope indicates to store the KV object in global datastore such as consul/etcd/zookeeper
GlobalScope
GlobalScope = "global"
defaultPrefix = "/var/lib/docker/network/files"
)
const (
@ -82,6 +119,27 @@ const (
EndpointKeyPrefix = "endpoint"
)
var (
defaultScopes = makeDefaultScopes()
)
func makeDefaultScopes() map[string]*ScopeCfg {
def := make(map[string]*ScopeCfg)
def[LocalScope] = &ScopeCfg{
Embedded: true,
Client: ScopeClientCfg{
Provider: "boltdb",
Address: defaultPrefix + "/boltdb.db",
Config: &store.Config{
Bucket: "libnetwork",
ConnectionTimeout: 3 * time.Second,
},
},
}
return def
}
var rootChain = []string{"docker", "libnetwork"}
func init() {
@ -91,6 +149,28 @@ func init() {
boltdb.Register()
}
// DefaultScopes returns a map of default scopes and it's config for clients to use.
func DefaultScopes(dataDir string) map[string]*ScopeCfg {
if dataDir != "" {
defaultScopes[LocalScope].Client.Address = dataDir + "/network/files/boltdb.db"
return defaultScopes
}
defaultScopes[LocalScope].Client.Address = defaultPrefix + "/boltdb.db"
return defaultScopes
}
// IsValid checks if the scope config has valid configuration.
func (cfg *ScopeCfg) IsValid() bool {
if cfg == nil ||
strings.TrimSpace(cfg.Client.Provider) == "" ||
strings.TrimSpace(cfg.Client.Address) == "" {
return false
}
return true
}
//Key provides convenient method to create a Key
func Key(key ...string) string {
keychain := append(rootChain, key...)
@ -110,7 +190,11 @@ func ParseKey(key string) ([]string, error) {
}
// newClient used to connect to KV Store
func newClient(kv string, addrs string, config *store.Config) (DataStore, error) {
func newClient(scope string, kv string, addrs string, config *store.Config, cached bool) (DataStore, error) {
if cached && scope != LocalScope {
return nil, fmt.Errorf("caching supported only for scope %s", LocalScope)
}
if config == nil {
config = &store.Config{}
}
@ -118,22 +202,82 @@ func newClient(kv string, addrs string, config *store.Config) (DataStore, error)
if err != nil {
return nil, err
}
ds := &datastore{store: store}
ds := &datastore{scope: scope, store: store}
if cached {
ds.cache = newCache(ds)
}
return ds, nil
}
// NewDataStore creates a new instance of LibKV data store
func NewDataStore(cfg *config.DatastoreCfg) (DataStore, error) {
if cfg == nil {
return nil, types.BadRequestErrorf("invalid configuration passed to datastore")
func NewDataStore(scope string, cfg *ScopeCfg) (DataStore, error) {
if cfg == nil || cfg.Client.Provider == "" || cfg.Client.Address == "" {
c, ok := defaultScopes[scope]
if !ok || c.Client.Provider == "" || c.Client.Address == "" {
return nil, fmt.Errorf("unexpected scope %s without configuration passed", scope)
}
cfg = c
}
// TODO : cfg.Embedded case
return newClient(cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config)
var cached bool
if scope == LocalScope {
cached = true
}
return newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, cached)
}
// NewCustomDataStore can be used by clients to plugin cusom datatore that adhers to store.Store
func NewCustomDataStore(customStore store.Store) DataStore {
return &datastore{store: customStore}
func (ds *datastore) Close() {
ds.store.Close()
}
func (ds *datastore) Scope() string {
return ds.scope
}
func (ds *datastore) Watchable() bool {
return ds.scope != LocalScope
}
func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error) {
sCh := make(chan struct{})
ctor, ok := kvObject.(KVConstructor)
if !ok {
return nil, fmt.Errorf("error watching object type %T, object does not implement KVConstructor interface", kvObject)
}
kvpCh, err := ds.store.Watch(Key(kvObject.Key()...), sCh)
if err != nil {
return nil, err
}
kvoCh := make(chan KVObject)
go func() {
for {
select {
case <-stopCh:
close(sCh)
return
case kvPair := <-kvpCh:
dstO := ctor.New()
if err := dstO.SetValue(kvPair.Value); err != nil {
log.Printf("Could not unmarshal kvpair value = %s", string(kvPair.Value))
break
}
dstO.SetIndex(kvPair.LastIndex)
kvoCh <- dstO
}
}
}()
return kvoCh, nil
}
func (ds *datastore) KVStore() store.Store {
@ -141,40 +285,71 @@ func (ds *datastore) KVStore() store.Store {
}
// PutObjectAtomic adds a new Record based on an object into the datastore
func (ds *datastore) PutObjectAtomic(kvObject KV) error {
func (ds *datastore) PutObjectAtomic(kvObject KVObject) error {
var (
previous *store.KVPair
pair *store.KVPair
err error
)
if kvObject == nil {
return types.BadRequestErrorf("invalid KV Object : nil")
}
kvObjValue := kvObject.Value()
if kvObjValue == nil {
return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
}
var previous *store.KVPair
if kvObject.Skip() {
goto add_cache
}
if kvObject.Exists() {
previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
} else {
previous = nil
}
_, pair, err := ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil)
_, pair, err = ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil)
if err != nil {
return err
}
kvObject.SetIndex(pair.LastIndex)
add_cache:
if ds.cache != nil {
return ds.cache.add(kvObject)
}
return nil
}
// PutObject adds a new Record based on an object into the datastore
func (ds *datastore) PutObject(kvObject KV) error {
func (ds *datastore) PutObject(kvObject KVObject) error {
if kvObject == nil {
return types.BadRequestErrorf("invalid KV Object : nil")
}
return ds.putObjectWithKey(kvObject, kvObject.Key()...)
if kvObject.Skip() {
goto add_cache
}
if err := ds.putObjectWithKey(kvObject, kvObject.Key()...); err != nil {
return err
}
add_cache:
if ds.cache != nil {
return ds.cache.add(kvObject)
}
return nil
}
func (ds *datastore) putObjectWithKey(kvObject KV, key ...string) error {
func (ds *datastore) putObjectWithKey(kvObject KVObject, key ...string) error {
kvObjValue := kvObject.Value()
if kvObjValue == nil {
@ -184,39 +359,128 @@ func (ds *datastore) putObjectWithKey(kvObject KV, key ...string) error {
}
// GetObject returns a record matching the key
func (ds *datastore) GetObject(key string, o KV) error {
func (ds *datastore) GetObject(key string, o KVObject) error {
if ds.cache != nil {
return ds.cache.get(key, o)
}
kvPair, err := ds.store.Get(key)
if err != nil {
return err
}
err = o.SetValue(kvPair.Value)
if err != nil {
if err := o.SetValue(kvPair.Value); err != nil {
return err
}
// Make sure the object has a correct view of the DB index in case we need to modify it
// and update the DB.
// Make sure the object has a correct view of the DB index in
// case we need to modify it and update the DB.
o.SetIndex(kvPair.LastIndex)
return nil
}
func (ds *datastore) ensureKey(key string) error {
exists, err := ds.store.Exists(key)
if err != nil {
return err
}
if exists {
return nil
}
return ds.store.Put(key, []byte{}, nil)
}
func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
if ds.cache != nil {
return ds.cache.list(kvObject)
}
// Bail out right away if the kvObject does not implement KVConstructor
ctor, ok := kvObject.(KVConstructor)
if !ok {
return nil, fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
}
// Make sure the parent key exists
if err := ds.ensureKey(key); err != nil {
return nil, err
}
kvList, err := ds.store.List(key)
if err != nil {
return nil, err
}
var kvol []KVObject
for _, kvPair := range kvList {
if len(kvPair.Value) == 0 {
continue
}
dstO := ctor.New()
if err := dstO.SetValue(kvPair.Value); err != nil {
return nil, err
}
// Make sure the object has a correct view of the DB index in
// case we need to modify it and update the DB.
dstO.SetIndex(kvPair.LastIndex)
kvol = append(kvol, dstO)
}
return kvol, nil
}
// DeleteObject unconditionally deletes a record from the store
func (ds *datastore) DeleteObject(kvObject KV) error {
func (ds *datastore) DeleteObject(kvObject KVObject) error {
// cleaup the cache first
if ds.cache != nil {
ds.cache.del(kvObject)
}
if kvObject.Skip() {
return nil
}
return ds.store.Delete(Key(kvObject.Key()...))
}
// DeleteObjectAtomic performs atomic delete on a record
func (ds *datastore) DeleteObjectAtomic(kvObject KV) error {
func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error {
if kvObject == nil {
return types.BadRequestErrorf("invalid KV Object : nil")
}
previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
_, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous)
return err
if kvObject.Skip() {
goto del_cache
}
if _, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
return err
}
del_cache:
// cleanup the cache only if AtomicDelete went through successfully
if ds.cache != nil {
return ds.cache.del(kvObject)
}
return nil
}
// DeleteTree unconditionally deletes a record from the store
func (ds *datastore) DeleteTree(kvObject KV) error {
func (ds *datastore) DeleteTree(kvObject KVObject) error {
// cleaup the cache first
if ds.cache != nil {
ds.cache.del(kvObject)
}
if kvObject.Skip() {
return nil
}
return ds.store.DeleteTree(Key(kvObject.KeyPrefix()...))
}

View File

@ -5,7 +5,6 @@ import (
"reflect"
"testing"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/options"
_ "github.com/docker/libnetwork/testutils"
"github.com/stretchr/testify/assert"
@ -15,7 +14,7 @@ var dummyKey = "dummy"
// NewCustomDataStore can be used by other Tests in order to use custom datastore
func NewTestDataStore() DataStore {
return &datastore{store: NewMockStore()}
return &datastore{scope: LocalScope, store: NewMockStore()}
}
func TestKey(t *testing.T) {
@ -38,11 +37,11 @@ func TestParseKey(t *testing.T) {
}
func TestInvalidDataStore(t *testing.T) {
config := &config.DatastoreCfg{}
config := &ScopeCfg{}
config.Embedded = false
config.Client.Provider = "invalid"
config.Client.Address = "localhost:8500"
_, err := NewDataStore(config)
_, err := NewDataStore(GlobalScope, config)
if err == nil {
t.Fatal("Invalid Datastore connection configuration must result in a failure")
}
@ -167,7 +166,7 @@ func (n *dummyObject) Skip() bool {
return n.SkipSave
}
func (n *dummyObject) DataScope() DataScope {
func (n *dummyObject) DataScope() string {
return LocalScope
}

View File

@ -1,10 +1,6 @@
package driverapi
import (
"net"
"github.com/docker/libnetwork/datastore"
)
import "net"
// NetworkPluginEndpointType represents the Endpoint Type used by Plugin system
const NetworkPluginEndpointType = "NetworkDriver"
@ -105,7 +101,7 @@ type DriverCallback interface {
// Capability represents the high level capabilities of the drivers which libnetwork can make use of
type Capability struct {
DataScope datastore.DataScope
DataScope string
}
// DiscoveryType represents the type of discovery element the DiscoverNew function is invoked on

View File

@ -3,6 +3,7 @@ package libnetwork
import (
"strings"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/ipamapi"
builtinIpam "github.com/docker/libnetwork/ipams/builtin"
@ -32,9 +33,9 @@ func makeDriverConfig(c *controller, ntype string) map[string]interface{} {
config := make(map[string]interface{})
if c.validateGlobalStoreConfig() {
config[netlabel.KVProvider] = c.cfg.GlobalStore.Client.Provider
config[netlabel.KVProviderURL] = c.cfg.GlobalStore.Client.Address
if dcfg, ok := c.cfg.Scopes[datastore.GlobalScope]; ok && dcfg.IsValid() {
config[netlabel.KVProvider] = dcfg.Client.Provider
config[netlabel.KVProviderURL] = dcfg.Client.Address
}
for _, label := range c.cfg.Daemon.Labels {

View File

@ -43,12 +43,16 @@ func (d *driver) CreateEndpoint(nid, eid string, ifInfo driverapi.InterfaceInfo,
return err
}
// Since we perform lazy configuration make sure we try
// configuring the driver when we enter CreateEndpoint since
// CreateNetwork may not be called in every node.
if err := d.configure(); err != nil {
return err
}
n := d.network(nid)
if n == nil {
n, err = d.createNetworkfromStore(nid)
if err != nil {
return fmt.Errorf("network id %q not found", nid)
}
return fmt.Errorf("network id %q not found", nid)
}
ep := &endpoint{

View File

@ -45,12 +45,13 @@ type network struct {
}
func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Data, ipV6Data []driverapi.IPAMData) error {
var err error
if id == "" {
return fmt.Errorf("invalid network id")
}
if err = d.configure(); err != nil {
// Since we perform lazy configuration make sure we try
// configuring the driver when we enter CreateNetwork
if err := d.configure(); err != nil {
return err
}
@ -71,29 +72,16 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Dat
n.subnets = append(n.subnets, s)
}
for {
// If the datastore has the network object already
// there is no need to do a write.
err = d.store.GetObject(datastore.Key(n.Key()...), n)
if err == nil || err != datastore.ErrKeyNotFound {
break
}
err = n.writeToStore()
if err == nil || err != datastore.ErrKeyModified {
break
}
}
if err != nil {
if err := n.writeToStore(); err != nil {
return fmt.Errorf("failed to update data store for network %v: %v", n.id, err)
}
d.addNetwork(n)
return nil
}
func (d *driver) createNetworkfromStore(nid string) (*network, error) {
/* func (d *driver) createNetworkfromStore(nid string) (*network, error) {
n := &network{
id: nid,
driver: d,
@ -107,7 +95,7 @@ func (d *driver) createNetworkfromStore(nid string) (*network, error) {
return nil, fmt.Errorf("unable to get network %q from data store, %v", nid, err)
}
return n, nil
}
}*/
func (d *driver) DeleteNetwork(nid string) error {
if nid == "" {
@ -313,9 +301,34 @@ func (d *driver) deleteNetwork(nid string) {
func (d *driver) network(nid string) *network {
d.Lock()
defer d.Unlock()
networks := d.networks
d.Unlock()
return d.networks[nid]
n, ok := networks[nid]
if !ok {
n = d.getNetworkFromStore(nid)
if n != nil {
n.driver = d
n.endpoints = endpointTable{}
n.once = &sync.Once{}
networks[nid] = n
}
}
return n
}
func (d *driver) getNetworkFromStore(nid string) *network {
if d.store == nil {
return nil
}
n := &network{id: nid}
if err := d.store.GetObject(datastore.Key(n.Key()...), n); err != nil {
return nil
}
return n
}
func (n *network) sandbox() osl.Sandbox {
@ -408,30 +421,23 @@ func (n *network) SetValue(value []byte) error {
subnetIP, _ := types.ParseCIDR(subnetIPstr)
gwIP, _ := types.ParseCIDR(gwIPstr)
// If the network is being created by reading from the
// datastore subnets have to created. If the network
// already exists update only the subnets' vni field
if len(n.subnets) == 0 {
s := &subnet{
subnetIP: subnetIP,
gwIP: gwIP,
vni: vni,
once: &sync.Once{},
}
n.subnets = append(n.subnets, s)
return nil
s := &subnet{
subnetIP: subnetIP,
gwIP: gwIP,
vni: vni,
once: &sync.Once{},
}
n.subnets = append(n.subnets, s)
sNet := n.getMatchingSubnet(subnetIP)
if sNet != nil {
if vni != 0 {
sNet.vni = vni
}
sNet.vni = vni
}
return nil
}
func (n *network) DataScope() datastore.DataScope {
func (n *network) DataScope() string {
return datastore.GlobalScope
}

View File

@ -6,7 +6,6 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/libkv/store"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/idm"
@ -84,8 +83,8 @@ func (d *driver) configure() error {
provURL, urlOk := d.config[netlabel.KVProviderURL]
if provOk && urlOk {
cfg := &config.DatastoreCfg{
Client: config.DatastoreClientCfg{
cfg := &datastore.ScopeCfg{
Client: datastore.ScopeClientCfg{
Provider: provider.(string),
Address: provURL.(string),
},
@ -94,7 +93,7 @@ func (d *driver) configure() error {
if confOk {
cfg.Client.Config = provConfig.(*store.Config)
}
d.store, err = datastore.NewDataStore(cfg)
d.store, err = datastore.NewDataStore(datastore.GlobalScope, cfg)
if err != nil {
err = fmt.Errorf("failed to initialize data store: %v", err)
return

View File

@ -12,6 +12,7 @@ import (
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/ipamapi"
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/options"
"github.com/docker/libnetwork/types"
)
@ -107,6 +108,37 @@ func (ep *endpoint) UnmarshalJSON(b []byte) (err error) {
return nil
}
func (ep *endpoint) New() datastore.KVObject {
return &endpoint{network: ep.getNetwork()}
}
func (ep *endpoint) CopyTo(o datastore.KVObject) error {
ep.Lock()
defer ep.Unlock()
dstEp := o.(*endpoint)
dstEp.name = ep.name
dstEp.id = ep.id
dstEp.sandboxID = ep.sandboxID
dstEp.dbIndex = ep.dbIndex
dstEp.dbExists = ep.dbExists
if ep.iface != nil {
dstEp.iface = &endpointInterface{}
ep.iface.CopyTo(dstEp.iface)
}
dstEp.exposedPorts = make([]types.TransportPort, len(ep.exposedPorts))
copy(dstEp.exposedPorts, ep.exposedPorts)
dstEp.generic = options.Generic{}
for k, v := range ep.generic {
dstEp.generic[k] = v
}
return nil
}
func (ep *endpoint) ID() string {
ep.Lock()
defer ep.Unlock()
@ -122,16 +154,28 @@ func (ep *endpoint) Name() string {
}
func (ep *endpoint) Network() string {
return ep.getNetwork().name
if ep.network == nil {
return ""
}
return ep.network.name
}
// endpoint Key structure : endpoint/network-id/endpoint-id
func (ep *endpoint) Key() []string {
return []string{datastore.EndpointKeyPrefix, ep.getNetwork().id, ep.id}
if ep.network == nil {
return nil
}
return []string{datastore.EndpointKeyPrefix, ep.network.id, ep.id}
}
func (ep *endpoint) KeyPrefix() []string {
return []string{datastore.EndpointKeyPrefix, ep.getNetwork().id}
if ep.network == nil {
return nil
}
return []string{datastore.EndpointKeyPrefix, ep.network.id}
}
func (ep *endpoint) networkIDFromKey(key string) (string, error) {
@ -177,7 +221,7 @@ func (ep *endpoint) Exists() bool {
}
func (ep *endpoint) Skip() bool {
return ep.getNetwork().Skip()
return ep.getNetwork().Skip() || ep.DataScope() == datastore.LocalScope
}
func (ep *endpoint) processOptions(options ...EndpointOption) {
@ -191,8 +235,22 @@ func (ep *endpoint) processOptions(options ...EndpointOption) {
}
}
func (ep *endpoint) Join(sbox Sandbox, options ...EndpointOption) error {
func (ep *endpoint) getNetwork() *network {
ep.Lock()
defer ep.Unlock()
return ep.network
}
func (ep *endpoint) getNetworkFromStore() (*network, error) {
if ep.network == nil {
return nil, fmt.Errorf("invalid network object in endpoint %s", ep.Name())
}
return ep.network.ctrlr.getNetworkFromStore(ep.network.id)
}
func (ep *endpoint) Join(sbox Sandbox, options ...EndpointOption) error {
if sbox == nil {
return types.BadRequestErrorf("endpoint cannot be joined by nil container")
}
@ -215,15 +273,27 @@ func (ep *endpoint) sbJoin(sbox Sandbox, options ...EndpointOption) error {
return types.BadRequestErrorf("not a valid Sandbox interface")
}
network, err := ep.getNetworkFromStore()
if err != nil {
return fmt.Errorf("failed to get network from store during join: %v", err)
}
ep, err = network.getEndpointFromStore(ep.ID())
if err != nil {
return fmt.Errorf("failed to get endpoint from store during join: %v", err)
}
ep.Lock()
if ep.sandboxID != "" {
ep.Unlock()
return types.ForbiddenErrorf("a sandbox has already joined the endpoint")
}
ep.Unlock()
ep.Lock()
ep.network = network
ep.sandboxID = sbox.ID()
ep.joinInfo = &endpointJoinInfo{}
network := ep.network
epid := ep.id
ep.Unlock()
defer func() {
@ -235,12 +305,16 @@ func (ep *endpoint) sbJoin(sbox Sandbox, options ...EndpointOption) error {
}()
network.Lock()
driver := network.driver
nid := network.id
network.Unlock()
ep.processOptions(options...)
driver, err := network.driver()
if err != nil {
return fmt.Errorf("failed to join endpoint: %v", err)
}
err = driver.Join(nid, epid, sbox.Key(), ep, sbox.Labels())
if err != nil {
return err
@ -262,14 +336,15 @@ func (ep *endpoint) sbJoin(sbox Sandbox, options ...EndpointOption) error {
return err
}
if err = sb.updateDNS(ep.getNetwork().enableIPv6); err != nil {
// Watch for service records
network.getController().watchSvcRecord(ep)
if err = sb.updateDNS(network.enableIPv6); err != nil {
return err
}
if !ep.isLocalScoped() {
if err = network.ctrlr.updateToStore(ep); err != nil {
return err
}
if err = network.getController().updateToStore(ep); err != nil {
return err
}
sb.Lock()
@ -327,6 +402,16 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error {
return types.BadRequestErrorf("not a valid Sandbox interface")
}
n, err := ep.getNetworkFromStore()
if err != nil {
return fmt.Errorf("failed to get network from store during leave: %v", err)
}
ep, err = n.getEndpointFromStore(ep.ID())
if err != nil {
return fmt.Errorf("failed to get endpoint from store during leave: %v", err)
}
ep.Lock()
sid := ep.sandboxID
ep.Unlock()
@ -342,21 +427,19 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error {
ep.Lock()
ep.sandboxID = ""
n := ep.network
ep.network = n
ep.Unlock()
n.Lock()
c := n.ctrlr
d := n.driver
n.Unlock()
if err := n.getController().updateToStore(ep); err != nil {
ep.Lock()
ep.sandboxID = sid
ep.Unlock()
return err
}
if !ep.isLocalScoped() {
if err := c.updateToStore(ep); err != nil {
ep.Lock()
ep.sandboxID = sid
ep.Unlock()
return err
}
d, err := n.driver()
if err != nil {
return fmt.Errorf("failed to leave endpoint: %v", err)
}
if err := d.Leave(n.id, ep.id); err != nil {
@ -367,6 +450,9 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error {
return err
}
// unwatch for service records
n.getController().unWatchSvcRecord(ep)
if sb.needDefaultGW() {
ep := sb.getEPwithoutGateway()
if ep == nil {
@ -379,49 +465,48 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error {
func (ep *endpoint) Delete() error {
var err error
n, err := ep.getNetworkFromStore()
if err != nil {
return fmt.Errorf("failed to get network during Delete: %v", err)
}
ep, err = n.getEndpointFromStore(ep.ID())
if err != nil {
return fmt.Errorf("failed to get endpoint from store during Delete: %v", err)
}
ep.Lock()
epid := ep.id
name := ep.name
n := ep.network
if ep.sandboxID != "" {
ep.Unlock()
return &ActiveContainerError{name: name, id: epid}
}
n.Lock()
ctrlr := n.ctrlr
n.Unlock()
ep.Unlock()
if !ep.isLocalScoped() {
if err = ctrlr.deleteFromStore(ep); err != nil {
return err
}
}
defer func() {
if err != nil {
ep.dbExists = false
if !ep.isLocalScoped() {
if e := ctrlr.updateToStore(ep); e != nil {
log.Warnf("failed to recreate endpoint in store %s : %v", name, e)
}
}
}
}()
// Update the endpoint count in network and update it in the datastore
n.DecEndpointCnt()
if err = ctrlr.updateToStore(n); err != nil {
if err = n.DecEndpointCnt(); err != nil {
return err
}
defer func() {
if err != nil {
n.IncEndpointCnt()
if e := ctrlr.updateToStore(n); e != nil {
if e := n.IncEndpointCnt(); e != nil {
log.Warnf("failed to update network %s : %v", n.name, e)
}
}
}()
if err = n.getController().deleteFromStore(ep); err != nil {
return err
}
defer func() {
if err != nil {
ep.dbExists = false
if e := n.getController().updateToStore(ep); e != nil {
log.Warnf("failed to recreate endpoint in store %s : %v", name, e)
}
}
}()
if err = ep.deleteEndpoint(); err != nil {
return err
}
@ -438,38 +523,21 @@ func (ep *endpoint) deleteEndpoint() error {
epid := ep.id
ep.Unlock()
n.Lock()
_, ok := n.endpoints[epid]
if !ok {
n.Unlock()
return nil
driver, err := n.driver()
if err != nil {
return fmt.Errorf("failed to delete endpoint: %v", err)
}
nid := n.id
driver := n.driver
delete(n.endpoints, epid)
n.Unlock()
if err := driver.DeleteEndpoint(nid, epid); err != nil {
if err := driver.DeleteEndpoint(n.id, epid); err != nil {
if _, ok := err.(types.ForbiddenError); ok {
n.Lock()
n.endpoints[epid] = ep
n.Unlock()
return err
}
log.Warnf("driver error deleting endpoint %s : %v", name, err)
}
n.updateSvcRecord(ep, false)
return nil
}
func (ep *endpoint) getNetwork() *network {
ep.Lock()
defer ep.Unlock()
return ep.network
}
func (ep *endpoint) getSandbox() (*sandbox, bool) {
ep.Lock()
c := ep.network.getController()
@ -545,14 +613,8 @@ func JoinOptionPriority(ep Endpoint, prio int) EndpointOption {
}
}
func (ep *endpoint) DataScope() datastore.DataScope {
ep.Lock()
defer ep.Unlock()
return ep.network.dataScope
}
func (ep *endpoint) isLocalScoped() bool {
return ep.DataScope() == datastore.LocalScope
func (ep *endpoint) DataScope() string {
return ep.getNetwork().DataScope()
}
func (ep *endpoint) assignAddress() error {

View File

@ -2,6 +2,7 @@ package libnetwork
import (
"encoding/json"
"fmt"
"net"
"github.com/docker/libnetwork/driverapi"
@ -115,6 +116,21 @@ func (epi *endpointInterface) UnmarshalJSON(b []byte) error {
return nil
}
func (epi *endpointInterface) CopyTo(dstEpi *endpointInterface) error {
dstEpi.mac = types.GetMacCopy(epi.mac)
dstEpi.addr = types.GetIPNetCopy(epi.addr)
dstEpi.addrv6 = types.GetIPNetCopy(epi.addrv6)
dstEpi.srcName = epi.srcName
dstEpi.dstPrefix = epi.dstPrefix
dstEpi.poolID = epi.poolID
for _, route := range epi.routes {
dstEpi.routes = append(dstEpi.routes, types.GetIPNetCopy(route))
}
return nil
}
type endpointJoinInfo struct {
gw net.IP
gw6 net.IP
@ -122,21 +138,38 @@ type endpointJoinInfo struct {
}
func (ep *endpoint) Info() EndpointInfo {
return ep
n, err := ep.getNetworkFromStore()
if err != nil {
return nil
}
ep, err = n.getEndpointFromStore(ep.ID())
if err != nil {
return nil
}
sb, ok := ep.getSandbox()
if !ok {
// endpoint hasn't joined any sandbox.
// Just return the endpoint
return ep
}
return sb.getEndpoint(ep.ID())
}
func (ep *endpoint) DriverInfo() (map[string]interface{}, error) {
ep.Lock()
network := ep.network
epid := ep.id
ep.Unlock()
n, err := ep.getNetworkFromStore()
if err != nil {
return nil, fmt.Errorf("could not find network in store for driver info: %v", err)
}
network.Lock()
driver := network.driver
nid := network.id
network.Unlock()
driver, err := n.driver()
if err != nil {
return nil, fmt.Errorf("failed to get driver info: %v", err)
}
return driver.EndpointOperInfo(nid, epid)
return driver.EndpointOperInfo(n.ID(), ep.ID())
}
func (ep *endpoint) Iface() InterfaceInfo {

View File

@ -6,7 +6,6 @@ import (
"sync"
log "github.com/Sirupsen/logrus"
"github.com/docker/libkv/store"
"github.com/docker/libnetwork/bitseq"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/ipamapi"
@ -30,13 +29,10 @@ const (
type Allocator struct {
// Predefined pools for default address spaces
predefined map[string][]*net.IPNet
// Static subnet information
localSubnets *PoolsConfig
globalSubnets *PoolsConfig
addrSpaces map[string]*addrSpace
// stores []datastore.Datastore
// Allocated addresses in each address space's subnet
addresses map[SubnetKey]*bitseq.Handle
// Datastore
addrSpace2Configs map[string]*PoolsConfig
sync.Mutex
}
@ -44,73 +40,86 @@ type Allocator struct {
func NewAllocator(lcDs, glDs datastore.DataStore) (*Allocator, error) {
a := &Allocator{}
a.localSubnets = &PoolsConfig{
subnets: map[SubnetKey]*PoolData{},
id: dsConfigKey + "/Pools",
scope: datastore.LocalScope,
ds: lcDs,
alloc: a,
}
a.globalSubnets = &PoolsConfig{
subnets: map[SubnetKey]*PoolData{},
id: dsConfigKey + "/Pools",
scope: datastore.GlobalScope,
ds: glDs,
alloc: a,
}
// Load predefined subnet pools
a.predefined = map[string][]*net.IPNet{
localAddressSpace: initLocalPredefinedPools(),
globalAddressSpace: initGlobalPredefinedPools(),
}
a.addrSpace2Configs = map[string]*PoolsConfig{
localAddressSpace: a.localSubnets,
globalAddressSpace: a.globalSubnets,
}
// Initialize bitseq map
a.addresses = make(map[SubnetKey]*bitseq.Handle)
cfgs := []struct {
cfg *PoolsConfig
dsc string
// Initialize address spaces
a.addrSpaces = make(map[string]*addrSpace)
for _, aspc := range []struct {
as string
ds datastore.DataStore
}{
{a.localSubnets, "local"},
{a.globalSubnets, "global"},
}
// Get the initial local/global pools configfrom the datastores
var inserterList []func() error
for _, e := range cfgs {
if e.cfg.ds == nil {
{localAddressSpace, lcDs},
{globalAddressSpace, glDs},
} {
if aspc.ds == nil {
continue
}
if err := e.cfg.watchForChanges(); err != nil {
log.Warnf("Error on registering watch for %s datastore: %v", e.dsc, err)
}
if err := e.cfg.readFromStore(); err != nil && err != store.ErrKeyNotFound {
return nil, fmt.Errorf("failed to retrieve the ipam %s pools config from datastore: %v", e.dsc, err)
}
e.cfg.Lock()
for k, v := range e.cfg.subnets {
if v.Range == nil {
inserterList = append(inserterList, func() error { return a.insertBitMask(e.cfg.ds, k, v.Pool) })
}
}
e.cfg.Unlock()
}
// Add the bitmasks (data could come from datastore)
if inserterList != nil {
for _, f := range inserterList {
if err := f(); err != nil {
return nil, err
}
a.addrSpaces[aspc.as] = &addrSpace{
subnets: map[SubnetKey]*PoolData{},
id: dsConfigKey + "/" + aspc.as,
scope: aspc.ds.Scope(),
ds: aspc.ds,
alloc: a,
}
}
return a, nil
}
func (a *Allocator) refresh(as string) error {
aSpace, err := a.getAddressSpaceFromStore(as)
if err != nil {
return fmt.Errorf("error getting pools config from store during init: %v",
err)
}
if aSpace == nil {
return nil
}
if err := a.updateBitMasks(aSpace); err != nil {
return fmt.Errorf("error updating bit masks during init: %v", err)
}
a.Lock()
a.addrSpaces[as] = aSpace
a.Unlock()
return nil
}
func (a *Allocator) updateBitMasks(aSpace *addrSpace) error {
var inserterList []func() error
aSpace.Lock()
for k, v := range aSpace.subnets {
if v.Range == nil {
inserterList = append(inserterList,
func() error { return a.insertBitMask(k, v.Pool) })
}
}
aSpace.Unlock()
// Add the bitmasks (data could come from datastore)
if inserterList != nil {
for _, f := range inserterList {
if err := f(); err != nil {
return err
}
}
}
return nil
}
// GetDefaultAddressSpaces returns the local and global default address spaces
func (a *Allocator) GetDefaultAddressSpaces() (string, string, error) {
return localAddressSpace, globalAddressSpace, nil
@ -123,25 +132,29 @@ func (a *Allocator) RequestPool(addressSpace, pool, subPool string, options map[
return "", nil, nil, ipamapi.ErrInvalidPool
}
cfg, err := a.getPoolsConfig(addressSpace)
retry:
if err := a.refresh(addressSpace); err != nil {
return "", nil, nil, err
}
aSpace, err := a.getAddrSpace(addressSpace)
if err != nil {
return "", nil, nil, err
}
retry:
insert, err := cfg.updatePoolDBOnAdd(*k, nw, ipr)
insert, err := aSpace.updatePoolDBOnAdd(*k, nw, ipr)
if err != nil {
return "", nil, nil, err
}
if err := cfg.writeToStore(); err != nil {
if err := a.writeToStore(aSpace); err != nil {
if _, ok := err.(types.RetryError); !ok {
return "", nil, nil, types.InternalErrorf("pool configuration failed because of %s", err.Error())
}
if erru := cfg.readFromStore(); erru != nil {
return "", nil, nil, fmt.Errorf("failed to get updated pool config from datastore (%v) after (%v)", erru, err)
}
goto retry
}
return k.String(), aw, nil, insert()
}
@ -152,23 +165,25 @@ func (a *Allocator) ReleasePool(poolID string) error {
return types.BadRequestErrorf("invalid pool id: %s", poolID)
}
cfg, err := a.getPoolsConfig(k.AddressSpace)
retry:
if err := a.refresh(k.AddressSpace); err != nil {
return err
}
aSpace, err := a.getAddrSpace(k.AddressSpace)
if err != nil {
return err
}
retry:
remove, err := cfg.updatePoolDBOnRemoval(k)
remove, err := aSpace.updatePoolDBOnRemoval(k)
if err != nil {
return err
}
if err = cfg.writeToStore(); err != nil {
if err = a.writeToStore(aSpace); err != nil {
if _, ok := err.(types.RetryError); !ok {
return types.InternalErrorf("pool (%s) removal failed because of %v", poolID, err)
}
if erru := cfg.readFromStore(); erru != nil {
return fmt.Errorf("failed to get updated pool config from datastore (%v) after (%v)", erru, err)
}
goto retry
}
@ -177,14 +192,14 @@ retry:
// Given the address space, returns the local or global PoolConfig based on the
// address space is local or global. AddressSpace locality is being registered with IPAM out of band.
func (a *Allocator) getPoolsConfig(addrSpace string) (*PoolsConfig, error) {
func (a *Allocator) getAddrSpace(as string) (*addrSpace, error) {
a.Lock()
defer a.Unlock()
cfg, ok := a.addrSpace2Configs[addrSpace]
aSpace, ok := a.addrSpaces[as]
if !ok {
return nil, types.BadRequestErrorf("cannot find locality of address space: %s", addrSpace)
return nil, types.BadRequestErrorf("cannot find locality of address space: %s", as)
}
return cfg, nil
return aSpace, nil
}
func (a *Allocator) parsePoolRequest(addressSpace, pool, subPool string, v6 bool) (*SubnetKey, *net.IPNet, *net.IPNet, *AddressRange, error) {
@ -224,8 +239,14 @@ func (a *Allocator) parsePoolRequest(addressSpace, pool, subPool string, v6 bool
return &SubnetKey{AddressSpace: addressSpace, Subnet: nw.String(), ChildSubnet: subPool}, nw, aw, ipr, nil
}
func (a *Allocator) insertBitMask(store datastore.DataStore, key SubnetKey, pool *net.IPNet) error {
func (a *Allocator) insertBitMask(key SubnetKey, pool *net.IPNet) error {
log.Debugf("Inserting bitmask (%s, %s)", key.String(), pool.String())
store := a.getStore(key.AddressSpace)
if store == nil {
return fmt.Errorf("could not find store for address space %s while inserting bit mask", key.AddressSpace)
}
ipVer := getAddressVersion(pool.IP)
ones, bits := pool.Mask.Size()
numAddresses := uint32(1 << uint(bits-ones))
@ -252,13 +273,13 @@ func (a *Allocator) insertBitMask(store datastore.DataStore, key SubnetKey, pool
return nil
}
func (a *Allocator) retrieveBitmask(ds datastore.DataStore, k SubnetKey, n *net.IPNet) (*bitseq.Handle, error) {
func (a *Allocator) retrieveBitmask(k SubnetKey, n *net.IPNet) (*bitseq.Handle, error) {
a.Lock()
bm, ok := a.addresses[k]
a.Unlock()
if !ok {
log.Debugf("Retrieving bitmask (%s, %s)", k.String(), n.String())
if err := a.insertBitMask(ds, k, n); err != nil {
if err := a.insertBitMask(k, n); err != nil {
return nil, fmt.Errorf("could not find bitmask in datastore for %s", k.String())
}
a.Lock()
@ -289,7 +310,7 @@ func (a *Allocator) getPredefinedPool(as string, ipV6 bool) (*net.IPNet, error)
return nil, fmt.Errorf("no default pool availbale for non-default addresss spaces")
}
cfg, err := a.getPoolsConfig(as)
aSpace, err := a.getAddrSpace(as)
if err != nil {
return nil, err
}
@ -298,14 +319,14 @@ func (a *Allocator) getPredefinedPool(as string, ipV6 bool) (*net.IPNet, error)
if v != getAddressVersion(nw.IP) {
continue
}
cfg.Lock()
_, ok := cfg.subnets[SubnetKey{AddressSpace: as, Subnet: nw.String()}]
cfg.Unlock()
aSpace.Lock()
_, ok := aSpace.subnets[SubnetKey{AddressSpace: as, Subnet: nw.String()}]
aSpace.Unlock()
if ok {
continue
}
if !cfg.contains(as, nw) {
if !aSpace.contains(as, nw) {
if as == localAddressSpace {
if err := netutils.CheckRouteOverlaps(nw); err == nil {
return nw, nil
@ -326,31 +347,35 @@ func (a *Allocator) RequestAddress(poolID string, prefAddress net.IP, opts map[s
return nil, nil, types.BadRequestErrorf("invalid pool id: %s", poolID)
}
cfg, err := a.getPoolsConfig(k.AddressSpace)
if err := a.refresh(k.AddressSpace); err != nil {
return nil, nil, err
}
aSpace, err := a.getAddrSpace(k.AddressSpace)
if err != nil {
return nil, nil, err
}
cfg.Lock()
p, ok := cfg.subnets[k]
aSpace.Lock()
p, ok := aSpace.subnets[k]
if !ok {
cfg.Unlock()
aSpace.Unlock()
return nil, nil, types.NotFoundErrorf("cannot find address pool for poolID:%s", poolID)
}
if prefAddress != nil && !p.Pool.Contains(prefAddress) {
cfg.Unlock()
aSpace.Unlock()
return nil, nil, ipamapi.ErrIPOutOfRange
}
c := p
for c.Range != nil {
k = c.ParentKey
c, ok = cfg.subnets[k]
c, ok = aSpace.subnets[k]
}
cfg.Unlock()
aSpace.Unlock()
bm, err := a.retrieveBitmask(cfg.ds, k, c.Pool)
bm, err := a.retrieveBitmask(k, c.Pool)
if err != nil {
return nil, nil, fmt.Errorf("could not find bitmask in datastore for %s on address %v request from pool %s: %v",
k.String(), prefAddress, poolID, err)
@ -370,29 +395,33 @@ func (a *Allocator) ReleaseAddress(poolID string, address net.IP) error {
return types.BadRequestErrorf("invalid pool id: %s", poolID)
}
cfg, err := a.getPoolsConfig(k.AddressSpace)
if err := a.refresh(k.AddressSpace); err != nil {
return err
}
aSpace, err := a.getAddrSpace(k.AddressSpace)
if err != nil {
return err
}
cfg.Lock()
p, ok := cfg.subnets[k]
aSpace.Lock()
p, ok := aSpace.subnets[k]
if !ok {
cfg.Unlock()
aSpace.Unlock()
return ipamapi.ErrBadPool
}
if address == nil || !p.Pool.Contains(address) {
cfg.Unlock()
aSpace.Unlock()
return ipamapi.ErrInvalidRequest
}
c := p
for c.Range != nil {
k = c.ParentKey
c = cfg.subnets[k]
c = aSpace.subnets[k]
}
cfg.Unlock()
aSpace.Unlock()
mask := p.Pool.Mask
if p.Range != nil {
@ -403,7 +432,7 @@ func (a *Allocator) ReleaseAddress(poolID string, address net.IP) error {
return fmt.Errorf("failed to release address %s: %v", address.String(), err)
}
bm, err := cfg.alloc.retrieveBitmask(cfg.ds, k, c.Pool)
bm, err := a.retrieveBitmask(k, c.Pool)
if err != nil {
return fmt.Errorf("could not find bitmask in datastore for %s on address %v release from pool %s: %v",
k.String(), address, poolID, err)
@ -449,23 +478,20 @@ func (a *Allocator) DumpDatabase() string {
a.Lock()
defer a.Unlock()
s := fmt.Sprintf("\n\nLocal Pool Config")
a.localSubnets.Lock()
for k, config := range a.localSubnets.subnets {
s = fmt.Sprintf("%s%s", s, fmt.Sprintf("\n%v: %v", k, config))
var s string
for as, aSpace := range a.addrSpaces {
s = fmt.Sprintf("\n\n%s Config", as)
aSpace.Lock()
for k, config := range aSpace.subnets {
s = fmt.Sprintf("%s%s", s, fmt.Sprintf("\n%v: %v", k, config))
}
aSpace.Unlock()
}
a.localSubnets.Unlock()
s = fmt.Sprintf("%s\n\nGlobal Pool Config", s)
a.globalSubnets.Lock()
for k, config := range a.globalSubnets.subnets {
s = fmt.Sprintf("%s%s", s, fmt.Sprintf("\n%v: %v", k, config))
}
a.globalSubnets.Unlock()
s = fmt.Sprintf("%s\n\nBitmasks", s)
for k, bm := range a.addresses {
s = fmt.Sprintf("%s%s", s, fmt.Sprintf("\n\t%s: %s\n\t%d", k, bm, bm.Unselected()))
}
return s
}

View File

@ -11,7 +11,6 @@ import (
"github.com/docker/libkv/store"
"github.com/docker/libnetwork/bitseq"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/ipamapi"
"github.com/docker/libnetwork/netutils"
@ -32,9 +31,9 @@ func randomLocalStore() (datastore.DataStore, error) {
if err := tmp.Close(); err != nil {
return nil, fmt.Errorf("Error closing temp file: %v", err)
}
return datastore.NewDataStore(&config.DatastoreCfg{
return datastore.NewDataStore(datastore.LocalScope, &datastore.ScopeCfg{
Embedded: true,
Client: config.DatastoreClientCfg{
Client: datastore.ScopeClientCfg{
Provider: "boltdb",
Address: defaultPrefix + tmp.Name(),
Config: &store.Config{
@ -191,7 +190,11 @@ func TestSubnetsMarshal(t *testing.T) {
t.Fatal(err)
}
cfg := a.localSubnets
cfg, err := a.getAddrSpace(localAddressSpace)
if err != nil {
t.Fatal(err)
}
ba := cfg.Value()
if err := cfg.SetValue(ba); err != nil {
t.Fatal(err)
@ -221,7 +224,7 @@ func TestAddSubnets(t *testing.T) {
if err != nil {
t.Fatal(err)
}
a.addrSpace2Configs["abc"] = a.addrSpace2Configs[localAddressSpace]
a.addrSpaces["abc"] = a.addrSpaces[localAddressSpace]
pid0, _, _, err := a.RequestPool(localAddressSpace, "10.0.0.0/8", "", nil, false)
if err != nil {
@ -290,7 +293,13 @@ func TestAddReleasePoolID(t *testing.T) {
if err != nil {
t.Fatal(err)
}
subnets := a.localSubnets.subnets
aSpace, err := a.getAddrSpace(localAddressSpace)
if err != nil {
t.Fatal(err)
}
subnets := aSpace.subnets
pid0, _, _, err := a.RequestPool(localAddressSpace, "10.0.0.0/8", "", nil, false)
if err != nil {
t.Fatalf("Unexpected failure in adding pool")
@ -298,6 +307,14 @@ func TestAddReleasePoolID(t *testing.T) {
if err := k0.FromString(pid0); err != nil {
t.Fatal(err)
}
aSpace, err = a.getAddrSpace(localAddressSpace)
if err != nil {
t.Fatal(err)
}
subnets = aSpace.subnets
if subnets[k0].RefCount != 1 {
t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount)
}
@ -309,6 +326,13 @@ func TestAddReleasePoolID(t *testing.T) {
if err := k1.FromString(pid1); err != nil {
t.Fatal(err)
}
aSpace, err = a.getAddrSpace(localAddressSpace)
if err != nil {
t.Fatal(err)
}
subnets = aSpace.subnets
if subnets[k1].RefCount != 1 {
t.Fatalf("Unexpected ref count for %s: %d", k1, subnets[k1].RefCount)
}
@ -323,6 +347,13 @@ func TestAddReleasePoolID(t *testing.T) {
if err := k2.FromString(pid2); err != nil {
t.Fatal(err)
}
aSpace, err = a.getAddrSpace(localAddressSpace)
if err != nil {
t.Fatal(err)
}
subnets = aSpace.subnets
if subnets[k2].RefCount != 2 {
t.Fatalf("Unexpected ref count for %s: %d", k2, subnets[k2].RefCount)
}
@ -334,12 +365,26 @@ func TestAddReleasePoolID(t *testing.T) {
if err := a.ReleasePool(pid1); err != nil {
t.Fatal(err)
}
aSpace, err = a.getAddrSpace(localAddressSpace)
if err != nil {
t.Fatal(err)
}
subnets = aSpace.subnets
if subnets[k0].RefCount != 2 {
t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount)
}
if err := a.ReleasePool(pid0); err != nil {
t.Fatal(err)
}
aSpace, err = a.getAddrSpace(localAddressSpace)
if err != nil {
t.Fatal(err)
}
subnets = aSpace.subnets
if subnets[k0].RefCount != 1 {
t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount)
}
@ -351,6 +396,13 @@ func TestAddReleasePoolID(t *testing.T) {
if pid00 != pid0 {
t.Fatalf("main pool should still exist")
}
aSpace, err = a.getAddrSpace(localAddressSpace)
if err != nil {
t.Fatal(err)
}
subnets = aSpace.subnets
if subnets[k0].RefCount != 2 {
t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount)
}
@ -358,6 +410,13 @@ func TestAddReleasePoolID(t *testing.T) {
if err := a.ReleasePool(pid2); err != nil {
t.Fatal(err)
}
aSpace, err = a.getAddrSpace(localAddressSpace)
if err != nil {
t.Fatal(err)
}
subnets = aSpace.subnets
if subnets[k0].RefCount != 1 {
t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount)
}
@ -365,6 +424,13 @@ func TestAddReleasePoolID(t *testing.T) {
if err := a.ReleasePool(pid00); err != nil {
t.Fatal(err)
}
aSpace, err = a.getAddrSpace(localAddressSpace)
if err != nil {
t.Fatal(err)
}
subnets = aSpace.subnets
if bp, ok := subnets[k0]; ok {
t.Fatalf("Base pool %s is still present: %v", k0, bp)
}
@ -373,6 +439,13 @@ func TestAddReleasePoolID(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected failure in adding pool")
}
aSpace, err = a.getAddrSpace(localAddressSpace)
if err != nil {
t.Fatal(err)
}
subnets = aSpace.subnets
if subnets[k0].RefCount != 1 {
t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount)
}
@ -417,18 +490,6 @@ func TestPredefinedPool(t *testing.T) {
if nw != a.predefined[localAddressSpace][i] {
t.Fatalf("Unexpected default network returned: %s", nw)
}
i, available, err = getFirstAvailablePool(a, globalAddressSpace, 2)
if err != nil {
t.Skip(err)
}
nw, err = a.getPredefinedPool(globalAddressSpace, false)
if err != nil {
t.Fatal(err)
}
if nw != available {
t.Fatalf("Unexpected default network returned: %s", nw)
}
}
func getFirstAvailablePool(a *Allocator, as string, atLeast int) (int, *net.IPNet, error) {
@ -475,7 +536,13 @@ func TestRemoveSubnet(t *testing.T) {
if err != nil {
t.Fatal(err)
}
a.addrSpace2Configs["splane"] = a.addrSpace2Configs[localAddressSpace]
a.addrSpaces["splane"] = &addrSpace{
id: dsConfigKey + "/" + "splane",
ds: a.addrSpaces[localAddressSpace].ds,
alloc: a.addrSpaces[localAddressSpace].alloc,
scope: a.addrSpaces[localAddressSpace].scope,
subnets: map[SubnetKey]*PoolData{},
}
input := []struct {
addrSpace string
@ -512,7 +579,13 @@ func TestGetSameAddress(t *testing.T) {
if err != nil {
t.Fatal(err)
}
a.addrSpace2Configs["giallo"] = a.addrSpace2Configs[localAddressSpace]
a.addrSpaces["giallo"] = &addrSpace{
id: dsConfigKey + "/" + "giallo",
ds: a.addrSpaces[localAddressSpace].ds,
alloc: a.addrSpaces[localAddressSpace].alloc,
scope: a.addrSpaces[localAddressSpace].scope,
subnets: map[SubnetKey]*PoolData{},
}
pid, _, _, err := a.RequestPool("giallo", "192.168.100.0/24", "", nil, false)
if err != nil {
@ -536,7 +609,13 @@ func TestRequestReleaseAddressFromSubPool(t *testing.T) {
if err != nil {
t.Fatal(err)
}
a.addrSpace2Configs["rosso"] = a.addrSpace2Configs[localAddressSpace]
a.addrSpaces["rosso"] = &addrSpace{
id: dsConfigKey + "/" + "rosso",
ds: a.addrSpaces[localAddressSpace].ds,
alloc: a.addrSpaces[localAddressSpace].alloc,
scope: a.addrSpaces[localAddressSpace].scope,
subnets: map[SubnetKey]*PoolData{},
}
poolID, _, _, err := a.RequestPool("rosso", "172.28.0.0/16", "172.28.30.0/24", nil, false)
if err != nil {
@ -615,17 +694,23 @@ func TestGetAddress(t *testing.T) {
func TestRequestSyntaxCheck(t *testing.T) {
var (
pool = "192.168.0.0/16"
subPool = "192.168.0.0/24"
addrSpace = "green"
err error
pool = "192.168.0.0/16"
subPool = "192.168.0.0/24"
as = "green"
err error
)
a, err := getAllocator()
if err != nil {
t.Fatal(err)
}
a.addrSpace2Configs[addrSpace] = a.addrSpace2Configs[localAddressSpace]
a.addrSpaces[as] = &addrSpace{
id: dsConfigKey + "/" + as,
ds: a.addrSpaces[localAddressSpace].ds,
alloc: a.addrSpaces[localAddressSpace].alloc,
scope: a.addrSpaces[localAddressSpace].scope,
subnets: map[SubnetKey]*PoolData{},
}
_, _, _, err = a.RequestPool("", pool, "", nil, false)
if err == nil {
@ -637,12 +722,12 @@ func TestRequestSyntaxCheck(t *testing.T) {
t.Fatalf("Failed to detect wrong request: empty address space")
}
_, _, _, err = a.RequestPool(addrSpace, "", subPool, nil, false)
_, _, _, err = a.RequestPool(as, "", subPool, nil, false)
if err == nil {
t.Fatalf("Failed to detect wrong request: subPool specified and no pool")
}
pid, _, _, err := a.RequestPool(addrSpace, pool, subPool, nil, false)
pid, _, _, err := a.RequestPool(as, pool, subPool, nil, false)
if err != nil {
t.Fatalf("Unexpected failure: %v", err)
}
@ -764,6 +849,7 @@ func TestRelease(t *testing.T) {
for i, inp := range toRelease {
ip0 := net.ParseIP(inp.address)
a.ReleaseAddress(pid, ip0)
bm = a.addresses[SubnetKey{localAddressSpace, subnet, ""}]
if bm.Unselected() != 1 {
t.Fatalf("Failed to update free address count after release. Expected %d, Found: %d", i+1, bm.Unselected())
}

View File

@ -2,30 +2,30 @@ package ipam
import (
"encoding/json"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/docker/libkv/store"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/types"
)
// Key provides the Key to be used in KV Store
func (cfg *PoolsConfig) Key() []string {
cfg.Lock()
defer cfg.Unlock()
return []string{cfg.id}
func (aSpace *addrSpace) Key() []string {
aSpace.Lock()
defer aSpace.Unlock()
return []string{aSpace.id}
}
// KeyPrefix returns the immediate parent key that can be used for tree walk
func (cfg *PoolsConfig) KeyPrefix() []string {
cfg.Lock()
defer cfg.Unlock()
func (aSpace *addrSpace) KeyPrefix() []string {
aSpace.Lock()
defer aSpace.Unlock()
return []string{dsConfigKey}
}
// Value marshals the data to be stored in the KV store
func (cfg *PoolsConfig) Value() []byte {
b, err := json.Marshal(cfg)
func (aSpace *addrSpace) Value() []byte {
b, err := json.Marshal(aSpace)
if err != nil {
log.Warnf("Failed to marshal ipam configured pools: %v", err)
return nil
@ -34,97 +34,94 @@ func (cfg *PoolsConfig) Value() []byte {
}
// SetValue unmarshalls the data from the KV store.
func (cfg *PoolsConfig) SetValue(value []byte) error {
rc := &PoolsConfig{subnets: make(map[SubnetKey]*PoolData)}
func (aSpace *addrSpace) SetValue(value []byte) error {
rc := &addrSpace{subnets: make(map[SubnetKey]*PoolData)}
if err := json.Unmarshal(value, rc); err != nil {
return err
}
cfg.subnets = rc.subnets
aSpace.subnets = rc.subnets
return nil
}
// Index returns the latest DB Index as seen by this object
func (cfg *PoolsConfig) Index() uint64 {
cfg.Lock()
defer cfg.Unlock()
return cfg.dbIndex
func (aSpace *addrSpace) Index() uint64 {
aSpace.Lock()
defer aSpace.Unlock()
return aSpace.dbIndex
}
// SetIndex method allows the datastore to store the latest DB Index into this object
func (cfg *PoolsConfig) SetIndex(index uint64) {
cfg.Lock()
cfg.dbIndex = index
cfg.dbExists = true
cfg.Unlock()
func (aSpace *addrSpace) SetIndex(index uint64) {
aSpace.Lock()
aSpace.dbIndex = index
aSpace.dbExists = true
aSpace.Unlock()
}
// Exists method is true if this object has been stored in the DB.
func (cfg *PoolsConfig) Exists() bool {
cfg.Lock()
defer cfg.Unlock()
return cfg.dbExists
func (aSpace *addrSpace) Exists() bool {
aSpace.Lock()
defer aSpace.Unlock()
return aSpace.dbExists
}
// Skip provides a way for a KV Object to avoid persisting it in the KV Store
func (cfg *PoolsConfig) Skip() bool {
func (aSpace *addrSpace) Skip() bool {
return false
}
func (cfg *PoolsConfig) watchForChanges() error {
if cfg.ds == nil {
return nil
}
kvpChan, err := cfg.ds.KVStore().Watch(datastore.Key(cfg.Key()...), nil)
if err != nil {
return err
}
go func() {
for {
select {
case kvPair := <-kvpChan:
if kvPair != nil {
cfg.readFromKey(kvPair)
}
}
}
}()
return nil
func (a *Allocator) getStore(as string) datastore.DataStore {
a.Lock()
defer a.Unlock()
return a.addrSpaces[as].ds
}
func (cfg *PoolsConfig) writeToStore() error {
if cfg.ds == nil {
return nil
func (a *Allocator) getAddressSpaceFromStore(as string) (*addrSpace, error) {
store := a.getStore(as)
if store == nil {
return nil, fmt.Errorf("store for address space %s not found", as)
}
err := cfg.ds.PutObjectAtomic(cfg)
pc := &addrSpace{id: dsConfigKey + "/" + as, ds: store, alloc: a}
if err := store.GetObject(datastore.Key(pc.Key()...), pc); err != nil {
if err == datastore.ErrKeyNotFound {
return nil, nil
}
return nil, fmt.Errorf("could not get pools config from store: %v", err)
}
return pc, nil
}
func (a *Allocator) writeToStore(aSpace *addrSpace) error {
store := aSpace.store()
if store == nil {
return fmt.Errorf("invalid store while trying to write %s address space", aSpace.DataScope())
}
err := store.PutObjectAtomic(aSpace)
if err == datastore.ErrKeyModified {
return types.RetryErrorf("failed to perform atomic write (%v). retry might fix the error", err)
}
return err
}
func (cfg *PoolsConfig) readFromStore() error {
if cfg.ds == nil {
return nil
func (a *Allocator) deleteFromStore(aSpace *addrSpace) error {
store := aSpace.store()
if store == nil {
return fmt.Errorf("invalid store while trying to delete %s address space", aSpace.DataScope())
}
return cfg.ds.GetObject(datastore.Key(cfg.Key()...), cfg)
}
func (cfg *PoolsConfig) readFromKey(kvPair *store.KVPair) {
if cfg.dbIndex < kvPair.LastIndex {
cfg.SetValue(kvPair.Value)
cfg.dbIndex = kvPair.LastIndex
cfg.dbExists = true
}
}
func (cfg *PoolsConfig) deleteFromStore() error {
if cfg.ds == nil {
return nil
}
return cfg.ds.DeleteObjectAtomic(cfg)
return store.DeleteObjectAtomic(aSpace)
}
// DataScope method returns the storage scope of the datastore
func (cfg *PoolsConfig) DataScope() datastore.DataScope {
return cfg.scope
func (aSpace *addrSpace) DataScope() string {
aSpace.Lock()
defer aSpace.Unlock()
return aSpace.scope
}

View File

@ -27,13 +27,13 @@ type PoolData struct {
RefCount int
}
// PoolsConfig contains the pool configurations
type PoolsConfig struct {
// addrSpace contains the pool configurations for the address space
type addrSpace struct {
subnets map[SubnetKey]*PoolData
dbIndex uint64
dbExists bool
id string
scope datastore.DataScope
scope string
ds datastore.DataStore
alloc *Allocator
sync.Mutex
@ -153,18 +153,18 @@ func (p *PoolData) UnmarshalJSON(data []byte) error {
return nil
}
// MarshalJSON returns the JSON encoding of the PoolsConfig object
func (cfg *PoolsConfig) MarshalJSON() ([]byte, error) {
cfg.Lock()
defer cfg.Unlock()
// MarshalJSON returns the JSON encoding of the addrSpace object
func (aSpace *addrSpace) MarshalJSON() ([]byte, error) {
aSpace.Lock()
defer aSpace.Unlock()
m := map[string]interface{}{
"Scope": string(cfg.scope),
"Scope": string(aSpace.scope),
}
if cfg.subnets != nil {
if aSpace.subnets != nil {
s := map[string]*PoolData{}
for k, v := range cfg.subnets {
for k, v := range aSpace.subnets {
s[k.String()] = v
}
m["Subnets"] = s
@ -173,10 +173,10 @@ func (cfg *PoolsConfig) MarshalJSON() ([]byte, error) {
return json.Marshal(m)
}
// UnmarshalJSON decodes data into the PoolsConfig object
func (cfg *PoolsConfig) UnmarshalJSON(data []byte) error {
cfg.Lock()
defer cfg.Unlock()
// UnmarshalJSON decodes data into the addrSpace object
func (aSpace *addrSpace) UnmarshalJSON(data []byte) error {
aSpace.Lock()
defer aSpace.Unlock()
m := map[string]interface{}{}
err := json.Unmarshal(data, &m)
@ -184,10 +184,10 @@ func (cfg *PoolsConfig) UnmarshalJSON(data []byte) error {
return err
}
cfg.scope = datastore.LocalScope
aSpace.scope = datastore.LocalScope
s := m["Scope"].(string)
if s == string(datastore.GlobalScope) {
cfg.scope = datastore.GlobalScope
aSpace.scope = datastore.GlobalScope
}
if v, ok := m["Subnets"]; ok {
@ -200,31 +200,81 @@ func (cfg *PoolsConfig) UnmarshalJSON(data []byte) error {
for ks, v := range s {
k := SubnetKey{}
k.FromString(ks)
cfg.subnets[k] = v
aSpace.subnets[k] = v
}
}
return nil
}
func (cfg *PoolsConfig) updatePoolDBOnAdd(k SubnetKey, nw *net.IPNet, ipr *AddressRange) (func() error, error) {
cfg.Lock()
defer cfg.Unlock()
// CopyTo deep copies the pool data to the destination pooldata
func (p *PoolData) CopyTo(dstP *PoolData) error {
dstP.ParentKey = p.ParentKey
dstP.Pool = types.GetIPNetCopy(p.Pool)
if p.Range != nil {
dstP.Range = &AddressRange{}
dstP.Range.Sub = types.GetIPNetCopy(p.Range.Sub)
dstP.Range.Start = p.Range.Start
dstP.Range.End = p.Range.End
}
dstP.RefCount = p.RefCount
return nil
}
func (aSpace *addrSpace) CopyTo(o datastore.KVObject) error {
aSpace.Lock()
defer aSpace.Unlock()
dstAspace := o.(*addrSpace)
dstAspace.id = aSpace.id
dstAspace.ds = aSpace.ds
dstAspace.alloc = aSpace.alloc
dstAspace.scope = aSpace.scope
dstAspace.dbIndex = aSpace.dbIndex
dstAspace.dbExists = aSpace.dbExists
dstAspace.subnets = make(map[SubnetKey]*PoolData)
for k, v := range aSpace.subnets {
dstAspace.subnets[k] = &PoolData{}
v.CopyTo(dstAspace.subnets[k])
}
return nil
}
func (aSpace *addrSpace) New() datastore.KVObject {
aSpace.Lock()
defer aSpace.Unlock()
return &addrSpace{
id: aSpace.id,
ds: aSpace.ds,
alloc: aSpace.alloc,
scope: aSpace.scope,
}
}
func (aSpace *addrSpace) updatePoolDBOnAdd(k SubnetKey, nw *net.IPNet, ipr *AddressRange) (func() error, error) {
aSpace.Lock()
defer aSpace.Unlock()
// Check if already allocated
if p, ok := cfg.subnets[k]; ok {
cfg.incRefCount(p, 1)
if p, ok := aSpace.subnets[k]; ok {
aSpace.incRefCount(p, 1)
return func() error { return nil }, nil
}
// If master pool, check for overlap
if ipr == nil {
if cfg.contains(k.AddressSpace, nw) {
if aSpace.contains(k.AddressSpace, nw) {
return nil, ipamapi.ErrPoolOverlap
}
// This is a new master pool, add it along with corresponding bitmask
cfg.subnets[k] = &PoolData{Pool: nw, RefCount: 1}
return func() error { return cfg.alloc.insertBitMask(cfg.ds, k, nw) }, nil
aSpace.subnets[k] = &PoolData{Pool: nw, RefCount: 1}
return func() error { return aSpace.alloc.insertBitMask(k, nw) }, nil
}
// This is a new non-master pool
@ -234,38 +284,38 @@ func (cfg *PoolsConfig) updatePoolDBOnAdd(k SubnetKey, nw *net.IPNet, ipr *Addre
Range: ipr,
RefCount: 1,
}
cfg.subnets[k] = p
aSpace.subnets[k] = p
// Look for parent pool
pp, ok := cfg.subnets[p.ParentKey]
pp, ok := aSpace.subnets[p.ParentKey]
if ok {
cfg.incRefCount(pp, 1)
aSpace.incRefCount(pp, 1)
return func() error { return nil }, nil
}
// Parent pool does not exist, add it along with corresponding bitmask
cfg.subnets[p.ParentKey] = &PoolData{Pool: nw, RefCount: 1}
return func() error { return cfg.alloc.insertBitMask(cfg.ds, p.ParentKey, nw) }, nil
aSpace.subnets[p.ParentKey] = &PoolData{Pool: nw, RefCount: 1}
return func() error { return aSpace.alloc.insertBitMask(p.ParentKey, nw) }, nil
}
func (cfg *PoolsConfig) updatePoolDBOnRemoval(k SubnetKey) (func() error, error) {
cfg.Lock()
defer cfg.Unlock()
func (aSpace *addrSpace) updatePoolDBOnRemoval(k SubnetKey) (func() error, error) {
aSpace.Lock()
defer aSpace.Unlock()
p, ok := cfg.subnets[k]
p, ok := aSpace.subnets[k]
if !ok {
return nil, ipamapi.ErrBadPool
}
cfg.incRefCount(p, -1)
aSpace.incRefCount(p, -1)
c := p
for ok {
if c.RefCount == 0 {
delete(cfg.subnets, k)
delete(aSpace.subnets, k)
if c.Range == nil {
return func() error {
bm, err := cfg.alloc.retrieveBitmask(cfg.ds, k, c.Pool)
bm, err := aSpace.alloc.retrieveBitmask(k, c.Pool)
if err != nil {
return fmt.Errorf("could not find bitmask in datastore for pool %s removal: %v", k.String(), err)
}
@ -274,24 +324,24 @@ func (cfg *PoolsConfig) updatePoolDBOnRemoval(k SubnetKey) (func() error, error)
}
}
k = c.ParentKey
c, ok = cfg.subnets[k]
c, ok = aSpace.subnets[k]
}
return func() error { return nil }, nil
}
func (cfg *PoolsConfig) incRefCount(p *PoolData, delta int) {
func (aSpace *addrSpace) incRefCount(p *PoolData, delta int) {
c := p
ok := true
for ok {
c.RefCount += delta
c, ok = cfg.subnets[c.ParentKey]
c, ok = aSpace.subnets[c.ParentKey]
}
}
// Checks whether the passed subnet is a superset or subset of any of the subset in this config db
func (cfg *PoolsConfig) contains(space string, nw *net.IPNet) bool {
for k, v := range cfg.subnets {
func (aSpace *addrSpace) contains(space string, nw *net.IPNet) bool {
for k, v := range aSpace.subnets {
if space == k.AddressSpace && k.ChildSubnet == "" {
if nw.Contains(v.Pool.IP) || v.Pool.Contains(nw.IP) {
return true
@ -300,3 +350,10 @@ func (cfg *PoolsConfig) contains(space string, nw *net.IPNet) bool {
}
return false
}
func (aSpace *addrSpace) store() datastore.DataStore {
aSpace.Lock()
defer aSpace.Unlock()
return aSpace.ds
}

View File

@ -6,7 +6,6 @@ import (
"net"
"testing"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/types"
@ -32,11 +31,6 @@ func TestDriverRegistration(t *testing.T) {
}
}
func SetTestDataStore(c NetworkController, custom datastore.DataStore) {
con := c.(*controller)
con.globalStore = custom
}
func TestNetworkMarshalling(t *testing.T) {
n := &network{
name: "Miao",

View File

@ -50,7 +50,7 @@ func TestMain(m *testing.M) {
os.Exit(1)
}
libnetwork.SetTestDataStore(controller, datastore.NewCustomDataStore(datastore.NewMockStore()))
//libnetwork.SetTestDataStore(controller, datastore.NewCustomDataStore(datastore.NewMockStore()))
x := m.Run()
controller.Stop()
@ -60,6 +60,9 @@ func TestMain(m *testing.M) {
func createController() error {
var err error
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
option := options.Generic{
"EnableIPForwarding": true,
}
@ -358,27 +361,6 @@ func TestNilRemoteDriver(t *testing.T) {
}
}
func TestDuplicateNetwork(t *testing.T) {
if !testutils.IsRunningInContainer() {
defer testutils.SetupTestOSContext(t)()
}
// Creating a default bridge name network (can't be removed)
_, err := controller.NewNetwork(bridgeNetType, "testdup")
if err != nil {
t.Fatal(err)
}
_, err = controller.NewNetwork(bridgeNetType, "testdup")
if err == nil {
t.Fatal("Expected to fail. But instead succeeded")
}
if _, ok := err.(libnetwork.NetworkNameError); !ok {
t.Fatalf("Did not fail with expected error. Actual error: %v", err)
}
}
func TestNetworkName(t *testing.T) {
if !testutils.IsRunningInContainer() {
defer testutils.SetupTestOSContext(t)()
@ -703,7 +685,7 @@ func TestNetworkEndpointsWalkers(t *testing.T) {
if netWanted == nil {
t.Fatal(err)
}
if net1 != netWanted {
if net1.ID() != netWanted.ID() {
t.Fatal(err)
}
@ -712,7 +694,7 @@ func TestNetworkEndpointsWalkers(t *testing.T) {
if netWanted == nil {
t.Fatal(err)
}
if net2 != netWanted {
if net2.ID() != netWanted.ID() {
t.Fatal(err)
}
}
@ -843,7 +825,7 @@ func TestControllerQuery(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected failure for NetworkByID(): %v", err)
}
if net1 != g {
if net1.ID() != g.ID() {
t.Fatalf("NetworkByID() returned unexpected element: %v", g)
}
@ -863,7 +845,7 @@ func TestControllerQuery(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected failure for NetworkByID(): %v", err)
}
if net2 != g {
if net2.ID() != g.ID() {
t.Fatalf("NetworkByID() returned unexpected element: %v", g)
}
}
@ -940,7 +922,7 @@ func TestNetworkQuery(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if ep12 != e {
if ep12.ID() != e.ID() {
t.Fatalf("EndpointByID() returned %v instead of %v", e, ep12)
}

View File

@ -2,6 +2,7 @@ package libnetwork
import (
"encoding/json"
"fmt"
"net"
"sync"
@ -127,7 +128,6 @@ type network struct {
networkType string
id string
ipamType string
driver driverapi.Driver
addrSpace string
ipamV4Config []*IpamConf
ipamV6Config []*IpamConf
@ -135,14 +135,14 @@ type network struct {
ipamV6Info []*IpamInfo
enableIPv6 bool
endpointCnt uint64
endpoints endpointTable
generic options.Generic
dbIndex uint64
svcRecords svcMap
dbExists bool
persist bool
stopWatchCh chan struct{}
dataScope datastore.DataScope
scope string
drvOnce *sync.Once
sync.Mutex
}
@ -164,11 +164,7 @@ func (n *network) Type() string {
n.Lock()
defer n.Unlock()
if n.driver == nil {
return ""
}
return n.driver.Type()
return n.networkType
}
func (n *network) Key() []string {
@ -220,10 +216,72 @@ func (n *network) Skip() bool {
return !n.persist
}
func (n *network) DataScope() datastore.DataScope {
func (n *network) New() datastore.KVObject {
n.Lock()
defer n.Unlock()
return n.dataScope
return &network{
ctrlr: n.ctrlr,
drvOnce: &sync.Once{},
}
}
// CopyTo deep copies to the destination IpamInfo
func (i *IpamInfo) CopyTo(dstI *IpamInfo) error {
dstI.PoolID = i.PoolID
if i.Meta != nil {
dstI.Meta = make(map[string]string)
for k, v := range i.Meta {
dstI.Meta[k] = v
}
}
dstI.AddressSpace = i.AddressSpace
dstI.Pool = types.GetIPNetCopy(i.Pool)
dstI.Gateway = types.GetIPNetCopy(i.Gateway)
if i.AuxAddresses != nil {
dstI.AuxAddresses = make(map[string]*net.IPNet)
for k, v := range i.AuxAddresses {
dstI.AuxAddresses[k] = types.GetIPNetCopy(v)
}
}
return nil
}
func (n *network) CopyTo(o datastore.KVObject) error {
n.Lock()
defer n.Unlock()
dstN := o.(*network)
dstN.name = n.name
dstN.id = n.id
dstN.networkType = n.networkType
dstN.ipamType = n.ipamType
dstN.endpointCnt = n.endpointCnt
dstN.enableIPv6 = n.enableIPv6
dstN.persist = n.persist
dstN.dbIndex = n.dbIndex
dstN.dbExists = n.dbExists
dstN.drvOnce = n.drvOnce
for _, v4info := range n.ipamV4Info {
dstV4Info := &IpamInfo{}
v4info.CopyTo(dstV4Info)
dstN.ipamV4Info = append(dstN.ipamV4Info, dstV4Info)
}
dstN.generic = options.Generic{}
for k, v := range n.generic {
dstN.generic[k] = v
}
return nil
}
func (n *network) DataScope() string {
return n.driverScope()
}
func (n *network) EndpointCnt() uint64 {
@ -232,16 +290,20 @@ func (n *network) EndpointCnt() uint64 {
return n.endpointCnt
}
func (n *network) IncEndpointCnt() {
func (n *network) IncEndpointCnt() error {
n.Lock()
n.endpointCnt++
n.Unlock()
return n.getController().updateToStore(n)
}
func (n *network) DecEndpointCnt() {
func (n *network) DecEndpointCnt() error {
n.Lock()
n.endpointCnt--
n.Unlock()
return n.getController().updateToStore(n)
}
// TODO : Can be made much more generic with the help of reflection (but has some golang limitations)
@ -372,17 +434,55 @@ func (n *network) processOptions(options ...NetworkOption) {
}
}
func (n *network) Delete() error {
var err error
func (n *network) driverScope() string {
c := n.getController()
ctrlr := n.getController()
ctrlr.Lock()
_, ok := ctrlr.networks[n.id]
ctrlr.Unlock()
c.Lock()
// Check if a driver for the specified network type is available
dd, ok := c.drivers[n.networkType]
c.Unlock()
if !ok {
return &UnknownNetworkError{name: n.name, id: n.id}
var err error
dd, err = c.loadDriver(n.networkType)
if err != nil {
// If driver could not be resolved simply return an empty string
return ""
}
}
return dd.capability.DataScope
}
func (n *network) driver() (driverapi.Driver, error) {
c := n.getController()
c.Lock()
// Check if a driver for the specified network type is available
dd, ok := c.drivers[n.networkType]
c.Unlock()
if !ok {
var err error
dd, err = c.loadDriver(n.networkType)
if err != nil {
return nil, err
}
}
return dd.driver, nil
}
func (n *network) Delete() error {
n.Lock()
c := n.ctrlr
name := n.name
id := n.id
n.Unlock()
n, err := c.getNetworkFromStore(id)
if err != nil {
return &UnknownNetworkError{name: name, id: id}
}
numEps := n.EndpointCnt()
@ -390,9 +490,22 @@ func (n *network) Delete() error {
return &ActiveEndpointsError{name: n.name, id: n.id}
}
// deleteNetworkFromStore performs an atomic delete operation and the network.endpointCnt field will help
// prevent any possible race between endpoint join and network delete
if err = ctrlr.deleteFromStore(n); err != nil {
if err = n.deleteNetwork(); err != nil {
return err
}
defer func() {
if err != nil {
if e := c.addNetwork(n); e != nil {
log.Warnf("failed to rollback deleteNetwork for network %s: %v",
n.Name(), err)
}
}
}()
// deleteFromStore performs an atomic delete operation and the
// network.endpointCnt field will help prevent any possible
// race between endpoint join and network delete
if err = n.getController().deleteFromStore(n); err != nil {
if err == datastore.ErrKeyModified {
return types.InternalErrorf("operation in progress. delete failed for network %s. Please try again.")
}
@ -402,65 +515,68 @@ func (n *network) Delete() error {
defer func() {
if err != nil {
n.dbExists = false
if e := ctrlr.updateToStore(n); e != nil {
if e := n.getController().updateToStore(n); e != nil {
log.Warnf("failed to recreate network in store %s : %v", n.name, e)
}
}
}()
if err = n.deleteNetwork(); err != nil {
return err
}
n.ipamRelease()
return nil
}
func (n *network) deleteNetwork() error {
n.Lock()
id := n.id
d := n.driver
n.ctrlr.Lock()
delete(n.ctrlr.networks, id)
n.ctrlr.Unlock()
n.Unlock()
d, err := n.driver()
if err != nil {
return fmt.Errorf("failed deleting network: %v", err)
}
if err := d.DeleteNetwork(n.id); err != nil {
// If it is bridge network type make sure we call the driver about the network
// because the network may have been created in some past life of libnetwork.
if n.Type() == "bridge" {
n.drvOnce.Do(func() {
err = n.getController().addNetwork(n)
})
if err != nil {
return err
}
}
if err := d.DeleteNetwork(n.ID()); err != nil {
// Forbidden Errors should be honored
if _, ok := err.(types.ForbiddenError); ok {
n.ctrlr.Lock()
n.ctrlr.networks[n.id] = n
n.ctrlr.Unlock()
return err
}
log.Warnf("driver error deleting network %s : %v", n.name, err)
}
n.stopWatch()
return nil
}
func (n *network) addEndpoint(ep *endpoint) error {
var err error
n.Lock()
n.endpoints[ep.id] = ep
d := n.driver
n.Unlock()
d, err := n.driver()
if err != nil {
return fmt.Errorf("failed to add endpoint: %v", err)
}
defer func() {
// If it is bridge network type make sure we call the driver about the network
// because the network may have been created in some past life of libnetwork.
if n.Type() == "bridge" {
n.drvOnce.Do(func() {
err = n.getController().addNetwork(n)
})
if err != nil {
n.Lock()
delete(n.endpoints, ep.id)
n.Unlock()
return err
}
}()
}
err = d.CreateEndpoint(n.id, ep.id, ep.Interface(), ep.generic)
if err != nil {
return types.InternalErrorf("failed to create endpoint %s on network %s: %v", ep.Name(), n.Name(), err)
return types.InternalErrorf("failed to create endpoint %s on network %s: %v",
ep.Name(), n.Name(), err)
}
n.updateSvcRecord(ep, true)
return nil
}
@ -476,7 +592,16 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
ep := &endpoint{name: name, generic: make(map[string]interface{}), iface: &endpointInterface{}}
ep.id = stringid.GenerateRandomID()
// Initialize ep.network with a possibly stale copy of n. We need this to get network from
// store. But once we get it from store we will have the most uptodate copy possible.
ep.network = n
ep.network, err = ep.getNetworkFromStore()
if err != nil {
return nil, fmt.Errorf("failed to get network during CreateEndpoint: %v", err)
}
n = ep.network
ep.processOptions(options...)
if err = ep.assignAddress(); err != nil {
@ -488,46 +613,46 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
}
}()
ctrlr := n.getController()
n.IncEndpointCnt()
if err = ctrlr.updateToStore(n); err != nil {
return nil, err
}
defer func() {
if err != nil {
n.DecEndpointCnt()
if err = ctrlr.updateToStore(n); err != nil {
log.Warnf("endpoint count cleanup failed when updating network for %s : %v", name, err)
}
}
}()
if err = n.addEndpoint(ep); err != nil {
return nil, err
}
defer func() {
if err != nil {
if e := ep.Delete(); ep != nil {
if e := ep.deleteEndpoint(); e != nil {
log.Warnf("cleaning up endpoint failed %s : %v", name, e)
}
}
}()
if !ep.isLocalScoped() {
if err = ctrlr.updateToStore(ep); err != nil {
return nil, err
if err = n.getController().updateToStore(ep); err != nil {
return nil, err
}
defer func() {
if err != nil {
if e := n.getController().deleteFromStore(ep); e != nil {
log.Warnf("error rolling back endpoint %s from store: %v", name, e)
}
}
}()
// Increment endpoint count to indicate completion of endpoint addition
if err = n.IncEndpointCnt(); err != nil {
return nil, err
}
return ep, nil
}
func (n *network) Endpoints() []Endpoint {
n.Lock()
defer n.Unlock()
list := make([]Endpoint, 0, len(n.endpoints))
for _, e := range n.endpoints {
list = append(list, e)
var list []Endpoint
endpoints, err := n.getEndpointsFromStore()
if err != nil {
log.Error(err)
}
for _, ep := range endpoints {
list = append(list, ep)
}
return list
@ -568,28 +693,32 @@ func (n *network) EndpointByID(id string) (Endpoint, error) {
if id == "" {
return nil, ErrInvalidID(id)
}
n.Lock()
defer n.Unlock()
if e, ok := n.endpoints[id]; ok {
return e, nil
ep, err := n.getEndpointFromStore(id)
if err != nil {
return nil, ErrNoSuchEndpoint(id)
}
return nil, ErrNoSuchEndpoint(id)
return ep, nil
}
func (n *network) isGlobalScoped() bool {
return n.DataScope() == datastore.GlobalScope
}
func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool) {
c := n.getController()
sr, ok := c.svcDb[n.ID()]
if !ok {
c.svcDb[n.ID()] = svcMap{}
sr = c.svcDb[n.ID()]
}
func (n *network) updateSvcRecord(ep *endpoint, isAdd bool) {
n.Lock()
var recs []etchosts.Record
if iface := ep.Iface(); iface.Address() != nil {
if isAdd {
n.svcRecords[ep.Name()] = iface.Address().IP
n.svcRecords[ep.Name()+"."+n.name] = iface.Address().IP
sr[ep.Name()] = iface.Address().IP
sr[ep.Name()+"."+n.name] = iface.Address().IP
} else {
delete(n.svcRecords, ep.Name())
delete(n.svcRecords, ep.Name()+"."+n.name)
delete(sr, ep.Name())
delete(sr, ep.Name()+"."+n.name)
}
recs = append(recs, etchosts.Record{
@ -610,12 +739,11 @@ func (n *network) updateSvcRecord(ep *endpoint, isAdd bool) {
}
var sbList []*sandbox
n.WalkEndpoints(func(e Endpoint) bool {
if sb, hasSandbox := e.(*endpoint).getSandbox(); hasSandbox {
for _, ep := range localEps {
if sb, hasSandbox := ep.getSandbox(); hasSandbox {
sbList = append(sbList, sb)
}
return false
})
}
for _, sb := range sbList {
if isAdd {
@ -631,7 +759,9 @@ func (n *network) getSvcRecords() []etchosts.Record {
defer n.Unlock()
var recs []etchosts.Record
for h, ip := range n.svcRecords {
sr, _ := n.ctrlr.svcDb[n.id]
for h, ip := range sr {
recs = append(recs, etchosts.Record{
Hosts: h,
IP: ip.String(),
@ -799,7 +929,7 @@ func (n *network) deriveAddressSpace() (string, error) {
if !ok {
return "", types.NotFoundErrorf("could not find ipam driver %s to get default address space", n.ipamType)
}
if n.isGlobalScoped() {
if n.DataScope() == datastore.GlobalScope {
return ipd.defaultGlobalAddressSpace, nil
}
return ipd.defaultLocalAddressSpace, nil

View File

@ -247,6 +247,19 @@ func (sb *sandbox) getConnectedEndpoints() []*endpoint {
return eps
}
func (sb *sandbox) getEndpoint(id string) *endpoint {
sb.Lock()
defer sb.Unlock()
for _, ep := range sb.endpoints {
if ep.id == id {
return ep
}
}
return nil
}
func (sb *sandbox) updateGateway(ep *endpoint) error {
sb.Lock()
osSbox := sb.osSbox
@ -359,7 +372,13 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
return nil
}
func (sb *sandbox) clearNetworkResources(ep *endpoint) error {
func (sb *sandbox) clearNetworkResources(origEp *endpoint) error {
ep := sb.getEndpoint(origEp.id)
if ep == nil {
return fmt.Errorf("could not find the sandbox endpoint data for endpoint %s",
ep.name)
}
sb.Lock()
osSbox := sb.osSbox
sb.Unlock()
@ -837,7 +856,7 @@ func (eh epHeap) Less(i, j int) bool {
cjp = 0
}
if cip == cjp {
return eh[i].getNetwork().Name() < eh[j].getNetwork().Name()
return eh[i].network.Name() < eh[j].network.Name()
}
return cip > cjp

View File

@ -115,21 +115,21 @@ func TestSandboxAddMultiPrio(t *testing.T) {
t.Fatal(err)
}
if ctrlr.sandboxes[sid].endpoints[0] != ep3 {
if ctrlr.sandboxes[sid].endpoints[0].ID() != ep3.ID() {
t.Fatal("Expected ep3 to be at the top of the heap. But did not find ep3 at the top of the heap")
}
if err := ep3.Leave(sbx); err != nil {
t.Fatal(err)
}
if ctrlr.sandboxes[sid].endpoints[0] != ep2 {
if ctrlr.sandboxes[sid].endpoints[0].ID() != ep2.ID() {
t.Fatal("Expected ep2 to be at the top of the heap after removing ep3. But did not find ep2 at the top of the heap")
}
if err := ep2.Leave(sbx); err != nil {
t.Fatal(err)
}
if ctrlr.sandboxes[sid].endpoints[0] != ep1 {
if ctrlr.sandboxes[sid].endpoints[0].ID() != ep1.ID() {
t.Fatal("Expected ep1 to be at the top of the heap after removing ep2. But did not find ep1 at the top of the heap")
}
@ -138,7 +138,7 @@ func TestSandboxAddMultiPrio(t *testing.T) {
t.Fatal(err)
}
if ctrlr.sandboxes[sid].endpoints[0] != ep3 {
if ctrlr.sandboxes[sid].endpoints[0].ID() != ep3.ID() {
t.Fatal("Expected ep3 to be at the top of the heap after adding ep3 back. But did not find ep3 at the top of the heap")
}
@ -185,7 +185,7 @@ func TestSandboxAddSamePrio(t *testing.T) {
t.Fatal(err)
}
if ctrlr.sandboxes[sid].endpoints[0] != ep1 {
if ctrlr.sandboxes[sid].endpoints[0].ID() != ep1.ID() {
t.Fatal("Expected ep1 to be at the top of the heap. But did not find ep1 at the top of the heap")
}
@ -193,7 +193,7 @@ func TestSandboxAddSamePrio(t *testing.T) {
t.Fatal(err)
}
if ctrlr.sandboxes[sid].endpoints[0] != ep2 {
if ctrlr.sandboxes[sid].endpoints[0].ID() != ep2.ID() {
t.Fatal("Expected ep2 to be at the top of the heap after removing ep3. But did not find ep2 at the top of the heap")
}

View File

@ -1,408 +1,348 @@
package libnetwork
import (
"encoding/json"
"fmt"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/libkv/store"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/datastore"
)
var (
defaultBoltTimeout = 3 * time.Second
defaultLocalStoreConfig = config.DatastoreCfg{
Embedded: true,
Client: config.DatastoreClientCfg{
Provider: "boltdb",
Address: defaultPrefix + "/boltdb.db",
Config: &store.Config{
Bucket: "libnetwork",
ConnectionTimeout: defaultBoltTimeout,
},
},
}
)
func (c *controller) validateGlobalStoreConfig() bool {
return c.cfg != nil && c.cfg.GlobalStore.Client.Provider != "" && c.cfg.GlobalStore.Client.Address != ""
}
func (c *controller) initGlobalStore() error {
func (c *controller) initStores() error {
c.Lock()
cfg := c.cfg
c.Unlock()
if !c.validateGlobalStoreConfig() {
return fmt.Errorf("globalstore initialization requires a valid configuration")
}
store, err := datastore.NewDataStore(&cfg.GlobalStore)
if err != nil {
return err
}
c.Lock()
c.globalStore = store
c.Unlock()
return nil
}
func (c *controller) initLocalStore() error {
c.Lock()
cfg := c.cfg
c.Unlock()
localStore, err := datastore.NewDataStore(c.getLocalStoreConfig(cfg))
if err != nil {
return err
}
c.Lock()
c.localStore = localStore
c.Unlock()
return nil
}
func (c *controller) restoreFromGlobalStore() error {
c.Lock()
s := c.globalStore
c.Unlock()
if s == nil {
return nil
}
c.restore("global")
return c.watchNetworks()
}
func (c *controller) restoreFromLocalStore() error {
c.Lock()
s := c.localStore
c.Unlock()
if s != nil {
c.restore("local")
}
return nil
}
func (c *controller) restore(store string) {
nws, err := c.getNetworksFromStore(store == "global")
if err == nil {
c.processNetworkUpdate(nws, nil)
} else if err != datastore.ErrKeyNotFound {
log.Warnf("failed to read networks from %s store during init : %v", store, err)
}
}
func (c *controller) getNetworksFromStore(global bool) ([]*store.KVPair, error) {
var cs datastore.DataStore
c.Lock()
if global {
cs = c.globalStore
} else {
cs = c.localStore
}
c.Unlock()
return cs.KVStore().List(datastore.Key(datastore.NetworkKeyPrefix))
}
func (c *controller) newNetworkFromStore(n *network) error {
n.Lock()
n.ctrlr = c
n.endpoints = endpointTable{}
n.Unlock()
return c.addNetwork(n)
}
func (c *controller) newEndpointFromStore(key string, ep *endpoint) error {
ep.Lock()
n := ep.network
id := ep.id
ep.Unlock()
_, err := n.EndpointByID(id)
if err != nil {
if _, ok := err.(ErrNoSuchEndpoint); ok {
return n.addEndpoint(ep)
}
}
return err
}
func (c *controller) updateToStore(kvObject datastore.KV) error {
if kvObject.Skip() {
return nil
}
cs := c.getDataStore(kvObject.DataScope())
if cs == nil {
log.Debugf("datastore not initialized. kv object %s is not added to the store", datastore.Key(kvObject.Key()...))
return nil
}
return cs.PutObjectAtomic(kvObject)
}
func (c *controller) deleteFromStore(kvObject datastore.KV) error {
if kvObject.Skip() {
return nil
}
cs := c.getDataStore(kvObject.DataScope())
if cs == nil {
log.Debugf("datastore not initialized. kv object %s is not deleted from datastore", datastore.Key(kvObject.Key()...))
return nil
}
if err := cs.DeleteObjectAtomic(kvObject); err != nil {
return err
}
return nil
}
func (c *controller) watchNetworks() error {
if !c.validateGlobalStoreConfig() {
return nil
}
c.Lock()
cs := c.globalStore
c.Unlock()
networkKey := datastore.Key(datastore.NetworkKeyPrefix)
if err := ensureKeys(networkKey, cs); err != nil {
return fmt.Errorf("failed to ensure if the network keys are valid and present in store: %v", err)
}
nwPairs, err := cs.KVStore().WatchTree(networkKey, nil)
if err != nil {
return err
}
go func() {
for {
select {
case nws := <-nwPairs:
c.Lock()
tmpview := networkTable{}
lview := c.networks
c.Unlock()
for k, v := range lview {
if v.isGlobalScoped() {
tmpview[k] = v
}
}
c.processNetworkUpdate(nws, &tmpview)
// Delete processing
for k := range tmpview {
c.Lock()
existing, ok := c.networks[k]
c.Unlock()
if !ok {
continue
}
tmp := network{}
if err := c.globalStore.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound {
continue
}
if err := existing.deleteNetwork(); err != nil {
log.Debugf("Delete failed %s: %s", existing.name, err)
}
}
}
}
}()
return nil
}
func (n *network) watchEndpoints() error {
if n.Skip() || !n.ctrlr.validateGlobalStoreConfig() {
return nil
}
n.Lock()
cs := n.ctrlr.globalStore
tmp := endpoint{network: n}
n.stopWatchCh = make(chan struct{})
stopCh := n.stopWatchCh
n.Unlock()
endpointKey := datastore.Key(tmp.KeyPrefix()...)
if err := ensureKeys(endpointKey, cs); err != nil {
return fmt.Errorf("failed to ensure if the endpoint keys are valid and present in store: %v", err)
}
epPairs, err := cs.KVStore().WatchTree(endpointKey, stopCh)
if err != nil {
return err
}
go func() {
for {
select {
case <-stopCh:
return
case eps := <-epPairs:
n.Lock()
tmpview := endpointTable{}
lview := n.endpoints
n.Unlock()
for k, v := range lview {
if v.network.isGlobalScoped() {
tmpview[k] = v
}
}
n.ctrlr.processEndpointsUpdate(eps, &tmpview)
// Delete processing
for k := range tmpview {
n.Lock()
existing, ok := n.endpoints[k]
n.Unlock()
if !ok {
continue
}
tmp := endpoint{}
if err := cs.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound {
continue
}
if err := existing.deleteEndpoint(); err != nil {
log.Debugf("Delete failed %s: %s", existing.name, err)
}
}
}
}
}()
return nil
}
func (n *network) stopWatch() {
n.Lock()
if n.stopWatchCh != nil {
close(n.stopWatchCh)
n.stopWatchCh = nil
}
n.Unlock()
}
func (c *controller) processNetworkUpdate(nws []*store.KVPair, prune *networkTable) {
for _, kve := range nws {
var n network
err := json.Unmarshal(kve.Value, &n)
if err != nil {
log.Error(err)
continue
}
if prune != nil {
delete(*prune, n.id)
}
n.SetIndex(kve.LastIndex)
c.Lock()
existing, ok := c.networks[n.id]
if c.cfg == nil {
c.Unlock()
if ok {
existing.Lock()
// Skip existing network update
if existing.dbIndex != n.Index() {
// Can't use SetIndex() since existing is locked.
existing.dbIndex = n.Index()
existing.dbExists = true
existing.endpointCnt = n.endpointCnt
}
existing.Unlock()
continue
}
if err = c.newNetworkFromStore(&n); err != nil {
log.Error(err)
}
}
}
func (c *controller) processEndpointUpdate(ep *endpoint) bool {
nw := ep.network
if nw == nil {
return true
}
nw.Lock()
id := nw.id
nw.Unlock()
c.Lock()
n, ok := c.networks[id]
c.Unlock()
if !ok {
return true
}
existing, _ := n.EndpointByID(ep.id)
if existing == nil {
return true
}
ee := existing.(*endpoint)
ee.Lock()
if ee.dbIndex != ep.Index() {
// Can't use SetIndex() because ee is locked.
ee.dbIndex = ep.Index()
ee.dbExists = true
ee.sandboxID = ep.sandboxID
}
ee.Unlock()
return false
}
func ensureKeys(key string, cs datastore.DataStore) error {
exists, err := cs.KVStore().Exists(key)
if err != nil {
return err
}
if exists {
return nil
}
return cs.KVStore().Put(key, []byte{}, nil)
}
scopeConfigs := c.cfg.Scopes
c.Unlock()
func (c *controller) getLocalStoreConfig(cfg *config.Config) *config.DatastoreCfg {
if cfg != nil && cfg.LocalStore.Client.Provider != "" && cfg.LocalStore.Client.Address != "" {
return &cfg.LocalStore
for scope, scfg := range scopeConfigs {
store, err := datastore.NewDataStore(scope, scfg)
if err != nil {
return err
}
c.Lock()
c.stores = append(c.stores, store)
c.Unlock()
}
return &defaultLocalStoreConfig
c.startWatch()
return nil
}
func (c *controller) getDataStore(dataScope datastore.DataScope) (dataStore datastore.DataStore) {
func (c *controller) closeStores() {
for _, store := range c.getStores() {
store.Close()
}
}
func (c *controller) getStore(scope string) datastore.DataStore {
c.Lock()
if dataScope == datastore.GlobalScope {
dataStore = c.globalStore
} else if dataScope == datastore.LocalScope {
dataStore = c.localStore
defer c.Unlock()
for _, store := range c.stores {
if store.Scope() == scope {
return store
}
}
return nil
}
func (c *controller) getStores() []datastore.DataStore {
c.Lock()
defer c.Unlock()
return c.stores
}
func (c *controller) getNetworkFromStore(nid string) (*network, error) {
for _, store := range c.getStores() {
n := &network{id: nid, ctrlr: c}
err := store.GetObject(datastore.Key(n.Key()...), n)
if err != nil && err != datastore.ErrKeyNotFound {
return nil, fmt.Errorf("could not find network %s: %v", nid, err)
}
// Continue searching in the next store if the key is not found in this store
if err == datastore.ErrKeyNotFound {
continue
}
return n, nil
}
return nil, fmt.Errorf("network %s not found", nid)
}
func (c *controller) getNetworksFromStore() ([]*network, error) {
var nl []*network
for _, store := range c.getStores() {
kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
&network{ctrlr: c})
if err != nil && err != datastore.ErrKeyNotFound {
return nil, fmt.Errorf("failed to get networks for scope %s: %v",
store.Scope(), err)
}
// Continue searching in the next store if no keys found in this store
if err == datastore.ErrKeyNotFound {
continue
}
for _, kvo := range kvol {
n := kvo.(*network)
n.ctrlr = c
nl = append(nl, n)
}
}
return nl, nil
}
func (n *network) getEndpointFromStore(eid string) (*endpoint, error) {
for _, store := range n.ctrlr.getStores() {
ep := &endpoint{id: eid, network: n}
err := store.GetObject(datastore.Key(ep.Key()...), ep)
if err != nil && err != datastore.ErrKeyNotFound {
return nil, fmt.Errorf("could not find endpoint %s: %v", eid, err)
}
// Continue searching in the next store if the key is not found in this store
if err == datastore.ErrKeyNotFound {
continue
}
return ep, nil
}
return nil, fmt.Errorf("endpoint %s not found", eid)
}
func (n *network) getEndpointsFromStore() ([]*endpoint, error) {
var epl []*endpoint
tmp := endpoint{network: n}
for _, store := range n.getController().getStores() {
kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &endpoint{network: n})
if err != nil && err != datastore.ErrKeyNotFound {
return nil,
fmt.Errorf("failed to get endpoints for network %s scope %s: %v",
n.Name(), store.Scope(), err)
}
// Continue searching in the next store if no keys found in this store
if err == datastore.ErrKeyNotFound {
continue
}
for _, kvo := range kvol {
ep := kvo.(*endpoint)
ep.network = n
epl = append(epl, ep)
}
}
return epl, nil
}
func (c *controller) updateToStore(kvObject datastore.KVObject) error {
cs := c.getStore(kvObject.DataScope())
if cs == nil {
log.Warnf("datastore for scope %s not initialized. kv object %s is not added to the store", kvObject.DataScope(), datastore.Key(kvObject.Key()...))
return nil
}
if err := cs.PutObjectAtomic(kvObject); err != nil {
return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err)
}
return nil
}
func (c *controller) deleteFromStore(kvObject datastore.KVObject) error {
cs := c.getStore(kvObject.DataScope())
if cs == nil {
log.Debugf("datastore for scope %s not initialized. kv object %s is not deleted from datastore", kvObject.DataScope(), datastore.Key(kvObject.Key()...))
return nil
}
retry:
if err := cs.DeleteObjectAtomic(kvObject); err != nil {
if err == datastore.ErrKeyModified {
if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil {
return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err)
}
goto retry
}
return err
}
return nil
}
type netWatch struct {
localEps map[string]*endpoint
remoteEps map[string]*endpoint
stopCh chan struct{}
}
func (c *controller) getLocalEps(nw *netWatch) []*endpoint {
c.Lock()
defer c.Unlock()
var epl []*endpoint
for _, ep := range nw.localEps {
epl = append(epl, ep)
}
return epl
}
func (c *controller) watchSvcRecord(ep *endpoint) {
c.watchCh <- ep
}
func (c *controller) unWatchSvcRecord(ep *endpoint) {
c.unWatchCh <- ep
}
func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, nCh <-chan datastore.KVObject) {
for {
select {
case <-nw.stopCh:
return
case o := <-nCh:
n := o.(*network)
epl, err := n.getEndpointsFromStore()
if err != nil {
break
}
c.Lock()
var addEp []*endpoint
delEpMap := make(map[string]*endpoint)
for k, v := range nw.remoteEps {
delEpMap[k] = v
}
for _, lEp := range epl {
if _, ok := nw.localEps[lEp.ID()]; ok {
continue
}
if _, ok := nw.remoteEps[lEp.ID()]; ok {
delete(delEpMap, lEp.ID())
continue
}
nw.remoteEps[lEp.ID()] = lEp
addEp = append(addEp, lEp)
}
c.Unlock()
for _, lEp := range addEp {
ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), true)
}
for _, lEp := range delEpMap {
ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), false)
}
}
}
}
func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoint) {
c.Lock()
nw, ok := nmap[ep.getNetwork().ID()]
c.Unlock()
if ok {
// Update the svc db for the local endpoint join right away
ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), true)
c.Lock()
nw.localEps[ep.ID()] = ep
c.Unlock()
return
}
nw = &netWatch{
localEps: make(map[string]*endpoint),
remoteEps: make(map[string]*endpoint),
}
// Update the svc db for the local endpoint join right away
// Do this before adding this ep to localEps so that we don't
// try to update this ep's container's svc records
ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), true)
c.Lock()
nw.localEps[ep.ID()] = ep
nmap[ep.getNetwork().ID()] = nw
nw.stopCh = make(chan struct{})
c.Unlock()
store := c.getStore(ep.getNetwork().DataScope())
if store == nil {
return
}
if !store.Watchable() {
return
}
ch, err := store.Watch(ep.getNetwork(), nw.stopCh)
if err != nil {
log.Warnf("Error creating watch for network: %v", err)
return
}
go c.networkWatchLoop(nw, ep, ch)
}
func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoint) {
c.Lock()
nw, ok := nmap[ep.getNetwork().ID()]
if ok {
delete(nw.localEps, ep.ID())
c.Unlock()
// Update the svc db about local endpoint leave right away
// Do this after we remove this ep from localEps so that we
// don't try to remove this svc record from this ep's container.
ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), false)
c.Lock()
if len(nw.localEps) == 0 {
close(nw.stopCh)
delete(nmap, ep.getNetwork().ID())
}
}
c.Unlock()
return
}
func (c *controller) processEndpointsUpdate(eps []*store.KVPair, prune *endpointTable) {
for _, epe := range eps {
var ep endpoint
err := json.Unmarshal(epe.Value, &ep)
if err != nil {
log.Error(err)
continue
}
if prune != nil {
delete(*prune, ep.id)
}
ep.SetIndex(epe.LastIndex)
if nid, err := ep.networkIDFromKey(epe.Key); err != nil {
log.Error(err)
continue
} else {
if n, err := c.NetworkByID(nid); err != nil {
log.Error(err)
continue
} else {
ep.network = n.(*network)
}
}
if c.processEndpointUpdate(&ep) {
err = c.newEndpointFromStore(epe.Key, &ep)
if err != nil {
log.Error(err)
}
func (c *controller) watchLoop(nmap map[string]*netWatch) {
for {
select {
case ep := <-c.watchCh:
c.processEndpointCreate(nmap, ep)
case ep := <-c.unWatchCh:
c.processEndpointDelete(nmap, ep)
}
}
}
func (c *controller) startWatch() {
c.watchCh = make(chan *endpoint)
c.unWatchCh = make(chan *endpoint)
nmap := make(map[string]*netWatch)
go c.watchLoop(nmap)
}

View File

@ -33,7 +33,7 @@ func testNewController(t *testing.T, provider, url string) (NetworkController, e
}
func TestBoltdbBackend(t *testing.T) {
defer os.Remove(defaultLocalStoreConfig.Client.Address)
defer os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
testLocalBackend(t, "", "", nil)
defer os.Remove("/tmp/boltdb.db")
config := &store.Config{Bucket: "testBackend", ConnectionTimeout: 3 * time.Second}
@ -64,7 +64,7 @@ func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Con
if err != nil {
t.Fatalf("Error creating endpoint: %v", err)
}
store := ctrl.(*controller).localStore.KVStore()
store := ctrl.(*controller).getStore(datastore.LocalScope).KVStore()
if exists, err := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, string(nw.ID()))); !exists || err != nil {
t.Fatalf("Network key should have been created.")
}
@ -100,7 +100,7 @@ func TestNoPersist(t *testing.T) {
if err != nil {
t.Fatalf("Error creating endpoint: %v", err)
}
store := ctrl.(*controller).localStore.KVStore()
store := ctrl.(*controller).getStore(datastore.LocalScope).KVStore()
if exists, _ := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, string(nw.ID()))); exists {
t.Fatalf("Network with persist=false should not be stored in KV Store")
}
@ -138,12 +138,8 @@ func TestLocalStoreLockTimeout(t *testing.T) {
}
defer ctrl1.Stop()
// Use the same boltdb file without closing the previous controller
ctrl2, err := New(cfgOptions...)
if err != nil {
t.Fatalf("Error new controller: %v", err)
}
store := ctrl2.(*controller).localStore
if store != nil {
t.Fatalf("localstore is expected to be nil")
_, err = New(cfgOptions...)
if err == nil {
t.Fatalf("Expected to fail but succeeded")
}
}

View File

@ -0,0 +1,157 @@
# -*- mode: sh -*-
#!/usr/bin/env bats
load helpers
function test_single_network_connectivity() {
local nw_name start end
nw_name=${1}
start=1
end=${2}
# Create containers and connect them to the network
for i in `seq ${start} ${end}`;
do
dnet_cmd $(inst_id2port 1) container create container_${i}
net_connect 1 container_${i} ${nw_name}
done
# Now test connectivity between all the containers using service names
for i in `seq ${start} ${end}`;
do
for j in `seq ${start} ${end}`;
do
if [ "$i" -eq "$j" ]; then
continue
fi
runc $(dnet_container_name 1 bridge) $(get_sbox_id 1 container_${i}) \
"ping -c 1 container_${j}"
done
done
# Teardown the container connections and the network
for i in `seq ${start} ${end}`;
do
net_disconnect 1 container_${i} ${nw_name}
dnet_cmd $(inst_id2port 1) container rm container_${i}
done
}
@test "Test default bridge network" {
skip_for_circleci
echo $(docker ps)
test_single_network_connectivity bridge 3
}
@test "Test bridge network" {
skip_for_circleci
echo $(docker ps)
dnet_cmd $(inst_id2port 1) network create -d bridge singlehost
test_single_network_connectivity singlehost 3
dnet_cmd $(inst_id2port 1) network rm singlehost
}
@test "Test bridge network dnet restart" {
skip_for_circleci
echo $(docker ps)
dnet_cmd $(inst_id2port 1) network create -d bridge singlehost
for iter in `seq 1 2`;
do
test_single_network_connectivity singlehost 3
docker restart dnet-1-bridge
sleep 2
done
dnet_cmd $(inst_id2port 1) network rm singlehost
}
@test "Test multiple bridge networks" {
skip_for_circleci
echo $(docker ps)
start=1
end=3
for i in `seq ${start} ${end}`;
do
dnet_cmd $(inst_id2port 1) container create container_${i}
for j in `seq ${start} ${end}`;
do
if [ "$i" -eq "$j" ]; then
continue
fi
if [ "$i" -lt "$j" ]; then
dnet_cmd $(inst_id2port 1) network create -d bridge sh${i}${j}
nw=sh${i}${j}
else
nw=sh${j}${i}
fi
osvc="svc${i}${j}"
dnet_cmd $(inst_id2port 1) service publish ${osvc}.${nw}
dnet_cmd $(inst_id2port 1) service attach container_${i} ${osvc}.${nw}
done
done
for i in `seq ${start} ${end}`;
do
echo ${i1}
for j in `seq ${start} ${end}`;
do
echo ${j1}
if [ "$i" -eq "$j" ]; then
continue
fi
osvc="svc${j}${i}"
echo "pinging ${osvc}"
dnet_cmd $(inst_id2port 1) service ls
runc $(dnet_container_name 1 bridge) $(get_sbox_id 1 container_${i}) "cat /etc/hosts"
runc $(dnet_container_name 1 bridge) $(get_sbox_id 1 container_${i}) "ping -c 1 ${osvc}"
done
done
for i in `seq ${start} ${end}`;
do
for j in `seq ${start} ${end}`;
do
if [ "$i" -eq "$j" ]; then
continue
fi
if [ "$i" -lt "$j" ]; then
nw=sh${i}${j}
else
nw=sh${j}${i}
fi
osvc="svc${i}${j}"
dnet_cmd $(inst_id2port 1) service detach container_${i} ${osvc}.${nw}
dnet_cmd $(inst_id2port 1) service unpublish ${osvc}.${nw}
done
dnet_cmd $(inst_id2port 1) container rm container_${i}
done
for i in `seq ${start} ${end}`;
do
for j in `seq ${start} ${end}`;
do
if [ "$i" -eq "$j" ]; then
continue
fi
if [ "$i" -lt "$j" ]; then
dnet_cmd $(inst_id2port 1) network rm sh${i}${j}
fi
done
done
}

View File

@ -6,6 +6,23 @@ function dnet_container_name() {
echo dnet-$1-$2
}
function get_sbox_id() {
local line
line=$(dnet_cmd $(inst_id2port ${1}) service ls | grep ${2})
echo ${line} | cut -d" " -f5
}
function net_connect() {
dnet_cmd $(inst_id2port ${1}) service publish ${2}.${3}
dnet_cmd $(inst_id2port ${1}) service attach ${2} ${2}.${3}
}
function net_disconnect() {
dnet_cmd $(inst_id2port ${1}) service detach ${2} ${2}.${3}
dnet_cmd $(inst_id2port ${1}) service unpublish ${2}.${3}
}
function start_consul() {
stop_consul
docker run -d \
@ -28,6 +45,7 @@ function stop_consul() {
}
function start_dnet() {
local inst suffix name hport cport hopt neighip bridge_ip labels tomlfile
inst=$1
shift
suffix=$1
@ -39,7 +57,6 @@ function start_dnet() {
hport=$((41000+${inst}-1))
cport=2385
hopt=""
isnum='^[0-9]+$'
while [ -n "$1" ]
do
@ -62,10 +79,12 @@ function start_dnet() {
labels="\"com.docker.network.driver.overlay.bind_interface=eth0\", \"com.docker.network.driver.overlay.neighbor_ip=${neighip}\""
fi
echo "parsed values: " ${name} ${hport} ${cport} ${hopt} ${neighip} ${labels}
mkdir -p /tmp/dnet/${name}
tomlfile="/tmp/dnet/${name}/libnetwork.toml"
cat > ${tomlfile} <<EOF
title = "LibNetwork Configuration file"
title = "LibNetwork Configuration file for ${name}"
[daemon]
debug = false
@ -73,13 +92,13 @@ title = "LibNetwork Configuration file"
[cluster]
discovery = "consul://${bridge_ip}:8500"
Heartbeat = 10
[globalstore]
embedded = false
[globalstore.client]
provider = "consul"
Address = "${bridge_ip}:8500"
[scopes]
[scopes.global]
embedded = false
[scopes.global.client]
provider = "consul"
address = "${bridge_ip}:8500"
EOF
echo "parsed values: " ${name} ${hport} ${cport} ${hopt}
docker run \
-d \
--name=${name} \
@ -101,6 +120,8 @@ function skip_for_circleci() {
}
function stop_dnet() {
local name
name=$(dnet_container_name $1 $2)
rm -rf /tmp/dnet/${name} || true
docker stop ${name} || true
@ -111,6 +132,8 @@ function stop_dnet() {
}
function dnet_cmd() {
local hport
hport=$1
shift
./cmd/dnet/dnet -H tcp://127.0.0.1:${hport} $*
@ -121,6 +144,8 @@ function dnet_exec() {
}
function runc() {
local dnet
dnet=${1}
shift
dnet_exec ${dnet} "cp /var/lib/docker/network/files/${1}*/* /scratch/rootfs/etc"

View File

@ -1,7 +1,19 @@
# -*- mode: sh -*-
#!/usr/bin/env bats
load helpers
function is_network_exist() {
line=$(dnet_cmd $(inst_id2port $1) network ls | grep ${2})
name=$(echo ${line} | cut -d" " -f2)
driver=$(echo ${line} | cut -d" " -f3)
if [ "$name" == "$2" -a "$driver" == "$3" ]; then
echo "true"
else
echo "false"
fi
}
@test "Test multinode network create" {
echo $(docker ps)
for i in `seq 1 3`;
@ -13,19 +25,17 @@ load helpers
for j in `seq 1 3`;
do
line=$(dnet_cmd $(inst_id2port $j) network ls | grep ${oname})
echo ${output}
[ "$status" -eq 0 ]
echo ${line}
name=$(echo ${line} | cut -d" " -f2)
driver=$(echo ${line} | cut -d" " -f3)
echo ${name} ${driver}
[ "$name" = "$oname" ]
[ "$driver" = "test" ]
result=$(is_network_exist $j ${oname} test)
[ "$result" = "true" ]
done
# Always try to remove the network from the first node
dnet_cmd $(inst_id2port 1) network rm ${oname}
# Always try to remove the network from the second node
dnet_cmd $(inst_id2port 2) network rm ${oname}
echo "delete ${oname}"
nresult=$(is_network_exist 1 ${oname} test)
echo ${nresult}
dnet_cmd $(inst_id2port 1) network ls
[ "$nresult" = "false" ]
done
}
@ -97,6 +107,7 @@ load helpers
dnet_cmd $(inst_id2port $i) network create -d test ${oname}
dnet_cmd $(inst_id2port $i) service publish ${osvc}.${oname}
dnet_cmd $(inst_id2port $i) container create container_${i}
dnet_cmd $(inst_id2port $i) network ls
dnet_cmd $(inst_id2port $i) service attach container_${i} ${osvc}.${oname}
for j in `seq 1 3`;

View File

@ -14,35 +14,28 @@ load helpers
dnet_cmd $(inst_id2port 1) network create -d overlay multihost
for i in `seq ${start} ${end}`;
do
osvc="svc$i"
dnet_cmd $(inst_id2port $i) service publish ${osvc}.multihost
dnet_cmd $(inst_id2port $i) container create container_${i}
dnet_cmd $(inst_id2port $i) service attach container_${i} ${osvc}.multihost
net_connect ${i} container_${i} multihost
done
# Now test connectivity between all the containers using service names
for i in `seq ${start} ${end}`;
do
src="svc$i"
line=$(dnet_cmd $(inst_id2port $i) service ls | grep ${src})
echo ${line}
sbid=$(echo ${line} | cut -d" " -f5)
for j in `seq ${start} ${end}`;
do
if [ "$i" -eq "$j" ]; then
continue
fi
runc $(dnet_container_name $i overlay) ${sbid} "ping -c 1 svc$j"
runc $(dnet_container_name $i overlay) $(get_sbox_id ${i} container_${i}) \
"ping -c 1 container_$j"
done
done
# Teardown the container connections and the network
for i in `seq ${start} ${end}`;
do
osvc="svc$i"
dnet_cmd $(inst_id2port $i) service detach container_${i} ${osvc}.multihost
net_disconnect ${i} container_${i} multihost
dnet_cmd $(inst_id2port $i) container rm container_${i}
dnet_cmd $(inst_id2port $i) service unpublish ${osvc}.multihost
done
run dnet_cmd $(inst_id2port 2) network rm multihost

View File

@ -78,6 +78,18 @@ unset cmap[dnet-2-multi]
stop_dnet 3 multi 1>>${INTEGRATION_ROOT}/test.log 2>&1
unset cmap[dnet-3-multi]
## Setup
start_dnet 1 bridge 1>>${INTEGRATION_ROOT}/test.log 2>&1
cmap[dnet-1-bridge]=dnet-1-bridge
## Run the test cases
./integration-tmp/bin/bats ./test/integration/dnet/bridge.bats
#docker logs dnet-1-bridge
## Teardown
stop_dnet 1 bridge 1>>${INTEGRATION_ROOT}/test.log 2>&1
unset cmap[dnet-1-bridge]
## Setup
start_dnet 1 overlay 1>>${INTEGRATION_ROOT}/test.log 2>&1
cmap[dnet-1-overlay]=dnet-1-overlay

View File

@ -7,6 +7,8 @@ load helpers
run dnet_cmd $(inst_id2port 1) network create -d test mh1
echo ${output}
[ "$status" -eq 0 ]
run dnet_cmd $(inst_id2port 1) network ls
echo ${output}
line=$(dnet_cmd $(inst_id2port 1) network ls | grep mh1)
echo ${line}
name=$(echo ${line} | cut -d" " -f2)
@ -32,9 +34,9 @@ load helpers
echo ${output}
[ "$status" -eq 0 ]
run dnet_cmd $(inst_id2port 1) service ls
[ "$status" -eq 0 ]
echo ${output}
echo ${lines[1]}
[ "$status" -eq 0 ]
svc=$(echo ${lines[1]} | cut -d" " -f2)
network=$(echo ${lines[1]} | cut -d" " -f3)
echo ${svc} ${network}