Remove always-on watch for networks and endpoints

Always on watching of networks and endpoints can
affect scalability of the cluster beyond a few nodes.
Remove pro active watching and watch only the objects
you are interested in.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
This commit is contained in:
Jana Radhakrishnan 2015-10-05 04:21:15 -07:00
parent d74384b1d4
commit 71e14dd52a
18 changed files with 1022 additions and 799 deletions

View File

@ -14,6 +14,7 @@ import (
"github.com/docker/docker/pkg/reexec"
"github.com/docker/libnetwork"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/options"
"github.com/docker/libnetwork/testutils"
@ -88,11 +89,13 @@ func i2sbL(i interface{}) []*sandboxResource {
}
func createTestNetwork(t *testing.T, network string) (libnetwork.NetworkController, libnetwork.Network) {
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
}
defer c.Stop()
netOption := options.Generic{
netlabel.GenericData: options.Generic{
@ -175,6 +178,9 @@ func TestJson(t *testing.T) {
func TestCreateDeleteNetwork(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -249,6 +255,9 @@ func TestCreateDeleteNetwork(t *testing.T) {
func TestGetNetworksAndEndpoints(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -518,6 +527,9 @@ func TestGetNetworksAndEndpoints(t *testing.T) {
func TestProcGetServices(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -686,6 +698,7 @@ func TestProcGetService(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
c, nw := createTestNetwork(t, "network")
defer c.Stop()
ep1, err := nw.CreateEndpoint("db")
if err != nil {
t.Fatal(err)
@ -738,6 +751,8 @@ func TestProcPublishUnpublishService(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
c, _ := createTestNetwork(t, "network")
defer c.Stop()
vars := make(map[string]string)
vbad, err := json.Marshal("bad service create data")
@ -870,6 +885,7 @@ func TestAttachDetachBackend(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
c, nw := createTestNetwork(t, "network")
defer c.Stop()
ep1, err := nw.CreateEndpoint("db")
if err != nil {
t.Fatal(err)
@ -994,6 +1010,9 @@ func TestAttachDetachBackend(t *testing.T) {
}
func TestDetectGetNetworksInvalidQueryComposition(t *testing.T) {
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -1011,6 +1030,7 @@ func TestDetectGetEndpointsInvalidQueryComposition(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
c, _ := createTestNetwork(t, "network")
defer c.Stop()
vars := map[string]string{urlNwName: "network", urlEpName: "x", urlEpPID: "y"}
_, errRsp := procGetEndpoints(c, vars, nil)
@ -1023,6 +1043,7 @@ func TestDetectGetServicesInvalidQueryComposition(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
c, _ := createTestNetwork(t, "network")
defer c.Stop()
vars := map[string]string{urlNwName: "network", urlEpName: "x", urlEpPID: "y"}
_, errRsp := procGetServices(c, vars, nil)
@ -1040,6 +1061,8 @@ func TestFindNetworkUtil(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
c, nw := createTestNetwork(t, "network")
defer c.Stop()
nid := nw.ID()
_, errRsp := findNetwork(c, "", byName)
@ -1102,6 +1125,9 @@ func TestFindNetworkUtil(t *testing.T) {
func TestCreateDeleteEndpoints(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -1225,6 +1251,9 @@ func TestCreateDeleteEndpoints(t *testing.T) {
func TestJoinLeave(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -1382,6 +1411,8 @@ func TestFindEndpointUtilPanic(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
defer checkPanic(t)
c, nw := createTestNetwork(t, "network")
defer c.Stop()
nid := nw.ID()
findEndpoint(c, nid, "", byID, -1)
}
@ -1390,6 +1421,8 @@ func TestFindServiceUtilPanic(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
defer checkPanic(t)
c, _ := createTestNetwork(t, "network")
defer c.Stop()
findService(c, "random_service", -1)
}
@ -1397,6 +1430,8 @@ func TestFindEndpointUtil(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
c, nw := createTestNetwork(t, "network")
defer c.Stop()
nid := nw.ID()
ep, err := nw.CreateEndpoint("secondEp", nil)
@ -1443,7 +1478,8 @@ func TestFindEndpointUtil(t *testing.T) {
t.Fatalf("Unexepected failure: %v", errRsp)
}
if ep0 != ep1 || ep0 != ep2 || ep0 != ep3 || ep0 != ep4 || ep0 != ep5 {
if ep0.ID() != ep1.ID() || ep0.ID() != ep2.ID() ||
ep0.ID() != ep3.ID() || ep0.ID() != ep4.ID() || ep0.ID() != ep5.ID() {
t.Fatalf("Diffenrent queries returned different endpoints")
}
@ -1665,6 +1701,9 @@ func TestwriteJSON(t *testing.T) {
func TestHttpHandlerUninit(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -1732,6 +1771,9 @@ func TestHttpHandlerBadBody(t *testing.T) {
rsp := newWriter()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -1765,6 +1807,9 @@ func TestEndToEnd(t *testing.T) {
rsp := newWriter()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)
@ -2213,6 +2258,9 @@ func TestEndToEndErrorMessage(t *testing.T) {
rsp := newWriter()
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
c, err := libnetwork.New()
if err != nil {
t.Fatal(err)

View File

@ -22,10 +22,12 @@ import (
"github.com/docker/docker/pkg/reexec"
"github.com/Sirupsen/logrus"
psignal "github.com/docker/docker/pkg/signal"
"github.com/docker/docker/pkg/term"
"github.com/docker/libnetwork"
"github.com/docker/libnetwork/api"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/options"
@ -76,6 +78,7 @@ func processConfig(cfg *config.Config) []config.Option {
if cfg == nil {
return options
}
dn := "bridge"
if strings.TrimSpace(cfg.Daemon.DefaultNetwork) != "" {
dn = cfg.Daemon.DefaultNetwork
@ -91,12 +94,12 @@ func processConfig(cfg *config.Config) []config.Option {
if cfg.Daemon.Labels != nil {
options = append(options, config.OptionLabels(cfg.Daemon.Labels))
}
if strings.TrimSpace(cfg.GlobalStore.Client.Provider) != "" {
options = append(options, config.OptionKVProvider(cfg.GlobalStore.Client.Provider))
}
if strings.TrimSpace(cfg.GlobalStore.Client.Address) != "" {
options = append(options, config.OptionKVProviderURL(cfg.GlobalStore.Client.Address))
if dcfg, ok := cfg.Scopes[datastore.GlobalScope]; ok && dcfg.IsValid() {
options = append(options, config.OptionKVProvider(dcfg.Client.Provider))
options = append(options, config.OptionKVProviderURL(dcfg.Client.Address))
}
dOptions, err := startDiscovery(&cfg.Cluster)
if err != nil {
logrus.Infof("Skipping discovery : %s", err.Error())
@ -182,8 +185,9 @@ func createDefaultNetwork(c libnetwork.NetworkController) {
genericOption[netlabel.GenericData] = map[string]interface{}{
"BridgeName": nw,
}
networkOption := libnetwork.NetworkOptionGeneric(genericOption)
createOptions = append(createOptions, networkOption)
createOptions = append(createOptions,
libnetwork.NetworkOptionGeneric(genericOption),
libnetwork.NetworkOptionPersist(false))
}
_, err := c.NewNetwork(d, nw, createOptions...)
if err != nil {
@ -214,6 +218,7 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error {
fmt.Println("Error starting dnetDaemon :", err)
return err
}
createDefaultNetwork(controller)
httpHandler := api.NewHTTPHandler(controller)
r := mux.NewRouter().StrictSlash(false)
@ -231,10 +236,21 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error {
post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler)
handleSignals(controller)
setupDumpStackTrap()
return http.ListenAndServe(d.addr, r)
}
func setupDumpStackTrap() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGUSR1)
go func() {
for range c {
psignal.DumpStacks()
}
}()
}
func handleSignals(controller libnetwork.NetworkController) {
c := make(chan os.Signal, 1)
signals := []os.Signal{os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT}

View File

@ -7,19 +7,21 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/discovery"
"github.com/docker/libkv/store"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/netlabel"
)
// Config encapsulates configurations of various Libnetwork components
type Config struct {
Daemon DaemonCfg
Cluster ClusterCfg
GlobalStore, LocalStore DatastoreCfg
Daemon DaemonCfg
Cluster ClusterCfg
Scopes map[string]*datastore.ScopeCfg
}
// DaemonCfg represents libnetwork core configuration
type DaemonCfg struct {
Debug bool
DataDir string
DefaultNetwork string
DefaultDriver string
Labels []string
@ -34,26 +36,28 @@ type ClusterCfg struct {
Heartbeat uint64
}
// DatastoreCfg represents Datastore configuration.
type DatastoreCfg struct {
Embedded bool
Client DatastoreClientCfg
}
// DatastoreClientCfg represents Datastore Client-only mode configuration
type DatastoreClientCfg struct {
Provider string
Address string
Config *store.Config
// LoadDefaultScopes loads default scope configs for scopes which
// doesn't have explicit user specified configs.
func (c *Config) LoadDefaultScopes(dataDir string) {
for k, v := range datastore.DefaultScopes(dataDir) {
if _, ok := c.Scopes[k]; !ok {
c.Scopes[k] = v
}
}
}
// ParseConfig parses the libnetwork configuration file
func ParseConfig(tomlCfgFile string) (*Config, error) {
var cfg Config
if _, err := toml.DecodeFile(tomlCfgFile, &cfg); err != nil {
cfg := &Config{
Scopes: map[string]*datastore.ScopeCfg{},
}
if _, err := toml.DecodeFile(tomlCfgFile, cfg); err != nil {
return nil, err
}
return &cfg, nil
cfg.LoadDefaultScopes(cfg.Daemon.DataDir)
return cfg, nil
}
// Option is a option setter function type used to pass varios configurations
@ -98,7 +102,10 @@ func OptionLabels(labels []string) Option {
func OptionKVProvider(provider string) Option {
return func(c *Config) {
log.Infof("Option OptionKVProvider: %s", provider)
c.GlobalStore.Client.Provider = strings.TrimSpace(provider)
if _, ok := c.Scopes[datastore.GlobalScope]; !ok {
c.Scopes[datastore.GlobalScope] = &datastore.ScopeCfg{}
}
c.Scopes[datastore.GlobalScope].Client.Provider = strings.TrimSpace(provider)
}
}
@ -106,7 +113,10 @@ func OptionKVProvider(provider string) Option {
func OptionKVProviderURL(url string) Option {
return func(c *Config) {
log.Infof("Option OptionKVProviderURL: %s", url)
c.GlobalStore.Client.Address = strings.TrimSpace(url)
if _, ok := c.Scopes[datastore.GlobalScope]; !ok {
c.Scopes[datastore.GlobalScope] = &datastore.ScopeCfg{}
}
c.Scopes[datastore.GlobalScope].Client.Address = strings.TrimSpace(url)
}
}
@ -124,6 +134,13 @@ func OptionDiscoveryAddress(address string) Option {
}
}
// OptionDataDir function returns an option setter for data folder
func OptionDataDir(dataDir string) Option {
return func(c *Config) {
c.Daemon.DataDir = dataDir
}
}
// ProcessOptions processes options and stores it in config
func (c *Config) ProcessOptions(options ...Option) {
for _, opt := range options {
@ -145,7 +162,10 @@ func IsValidName(name string) bool {
func OptionLocalKVProvider(provider string) Option {
return func(c *Config) {
log.Infof("Option OptionLocalKVProvider: %s", provider)
c.LocalStore.Client.Provider = strings.TrimSpace(provider)
if _, ok := c.Scopes[datastore.LocalScope]; !ok {
c.Scopes[datastore.LocalScope] = &datastore.ScopeCfg{}
}
c.Scopes[datastore.LocalScope].Client.Provider = strings.TrimSpace(provider)
}
}
@ -153,7 +173,10 @@ func OptionLocalKVProvider(provider string) Option {
func OptionLocalKVProviderURL(url string) Option {
return func(c *Config) {
log.Infof("Option OptionLocalKVProviderURL: %s", url)
c.LocalStore.Client.Address = strings.TrimSpace(url)
if _, ok := c.Scopes[datastore.LocalScope]; !ok {
c.Scopes[datastore.LocalScope] = &datastore.ScopeCfg{}
}
c.Scopes[datastore.LocalScope].Client.Address = strings.TrimSpace(url)
}
}
@ -161,6 +184,9 @@ func OptionLocalKVProviderURL(url string) Option {
func OptionLocalKVProviderConfig(config *store.Config) Option {
return func(c *Config) {
log.Infof("Option OptionLocalKVProviderConfig: %v", config)
c.LocalStore.Client.Config = config
if _, ok := c.Scopes[datastore.LocalScope]; !ok {
c.Scopes[datastore.LocalScope] = &datastore.ScopeCfg{}
}
c.Scopes[datastore.LocalScope].Client.Config = config
}
}

View File

@ -124,73 +124,71 @@ type ipamData struct {
}
type driverTable map[string]*driverData
//type networkTable map[string]*network
//type endpointTable map[string]*endpoint
type ipamTable map[string]*ipamData
type networkTable map[string]*network
type endpointTable map[string]*endpoint
type sandboxTable map[string]*sandbox
type controller struct {
id string
networks networkTable
drivers driverTable
ipamDrivers ipamTable
sandboxes sandboxTable
cfg *config.Config
globalStore, localStore datastore.DataStore
discovery hostdiscovery.HostDiscovery
extKeyListener net.Listener
id string
//networks networkTable
drivers driverTable
ipamDrivers ipamTable
sandboxes sandboxTable
cfg *config.Config
stores []datastore.DataStore
discovery hostdiscovery.HostDiscovery
extKeyListener net.Listener
watchCh chan *endpoint
unWatchCh chan *endpoint
svcDb map[string]svcMap
sync.Mutex
}
// New creates a new instance of network controller.
func New(cfgOptions ...config.Option) (NetworkController, error) {
var cfg *config.Config
cfg = &config.Config{
Daemon: config.DaemonCfg{
DriverCfg: make(map[string]interface{}),
},
Scopes: make(map[string]*datastore.ScopeCfg),
}
if len(cfgOptions) > 0 {
cfg = &config.Config{
Daemon: config.DaemonCfg{
DriverCfg: make(map[string]interface{}),
},
}
cfg.ProcessOptions(cfgOptions...)
}
cfg.LoadDefaultScopes(cfg.Daemon.DataDir)
c := &controller{
id: stringid.GenerateRandomID(),
cfg: cfg,
networks: networkTable{},
sandboxes: sandboxTable{},
drivers: driverTable{},
ipamDrivers: ipamTable{}}
if err := initDrivers(c); err != nil {
ipamDrivers: ipamTable{},
svcDb: make(map[string]svcMap),
}
if err := c.initStores(); err != nil {
return nil, err
}
if cfg != nil {
if err := c.initGlobalStore(); err != nil {
// Failing to initalize datastore is a bad situation to be in.
// But it cannot fail creating the Controller
log.Debugf("Failed to Initialize Datastore due to %v. Operating in non-clustered mode", err)
}
if err := c.initLocalStore(); err != nil {
log.Debugf("Failed to Initialize LocalDatastore due to %v.", err)
}
}
if err := initIpams(c, c.localStore, c.globalStore); err != nil {
return nil, err
}
if cfg != nil {
if err := c.restoreFromGlobalStore(); err != nil {
log.Debugf("Failed to restore from global Datastore due to %v", err)
}
if cfg != nil && cfg.Cluster.Watcher != nil {
if err := c.initDiscovery(cfg.Cluster.Watcher); err != nil {
// Failing to initalize discovery is a bad situation to be in.
// But it cannot fail creating the Controller
log.Debugf("Failed to Initialize Discovery : %v", err)
}
if err := c.restoreFromLocalStore(); err != nil {
log.Debugf("Failed to restore from local Datastore due to %v", err)
}
}
if err := initDrivers(c); err != nil {
return nil, err
}
if err := initIpams(c, c.getStore(datastore.LocalScope),
c.getStore(datastore.GlobalScope)); err != nil {
return nil, err
}
if err := c.startExternalKeyListener(); err != nil {
@ -325,15 +323,6 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
if !config.IsValidName(name) {
return nil, ErrInvalidName(name)
}
// Check if a network already exists with the specified network name
c.Lock()
for _, n := range c.networks {
if n.name == name {
c.Unlock()
return nil, NetworkNameError(name)
}
}
c.Unlock()
// Construct the network object
network := &network{
@ -342,13 +331,15 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
ipamType: ipamapi.DefaultIPAM,
id: stringid.GenerateRandomID(),
ctrlr: c,
endpoints: endpointTable{},
persist: true,
drvOnce: &sync.Once{},
}
network.processOptions(options...)
if _, err := c.loadNetworkDriver(network); err != nil {
// Make sure we have a driver available for this network type
// before we allocate anything.
if _, err := network.driver(); err != nil {
return nil, err
}
@ -364,7 +355,16 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
}
}()
if err = c.addNetwork(network); err != nil {
// addNetwork can be called for local scope network lazily when
// an endpoint is created after a restart and the network was
// created in previous life. Make sure you wrap around the driver
// notification of network creation in once call so that the driver
// invoked only once in case both the network and endpoint creation
// happens in the same lifetime.
network.drvOnce.Do(func() {
err = c.addNetwork(network)
})
if err != nil {
return nil, err
}
@ -380,35 +380,28 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
}
func (c *controller) addNetwork(n *network) error {
if _, err := c.loadNetworkDriver(n); err != nil {
d, err := n.driver()
if err != nil {
return err
}
n.Lock()
d := n.driver
n.Unlock()
// Create the network
if err := d.CreateNetwork(n.id, n.generic, n.getIPv4Data(), n.getIPv6Data()); err != nil {
return err
}
if n.isGlobalScoped() {
if err := n.watchEndpoints(); err != nil {
return err
}
}
c.Lock()
c.networks[n.id] = n
c.Unlock()
return nil
}
func (c *controller) Networks() []Network {
c.Lock()
defer c.Unlock()
var list []Network
list := make([]Network, 0, len(c.networks))
for _, n := range c.networks {
networks, err := c.getNetworksFromStore()
if err != nil {
log.Error(err)
}
for _, n := range networks {
list = append(list, n)
}
@ -450,12 +443,13 @@ func (c *controller) NetworkByID(id string) (Network, error) {
if id == "" {
return nil, ErrInvalidID(id)
}
c.Lock()
defer c.Unlock()
if n, ok := c.networks[id]; ok {
return n, nil
n, err := c.getNetworkFromStore(id)
if err != nil {
return nil, ErrNoSuchNetwork(id)
}
return nil, ErrNoSuchNetwork(id)
return n, nil
}
// NewSandbox creates a new sandbox for the passed container id
@ -620,30 +614,7 @@ func (c *controller) getIpamDriver(name string) (ipamapi.Ipam, error) {
}
func (c *controller) Stop() {
if c.localStore != nil {
c.localStore.KVStore().Close()
}
c.closeStores()
c.stopExternalKeyListener()
osl.GC()
}
func (c *controller) loadNetworkDriver(n *network) (driverapi.Driver, error) {
// Check if a driver for the specified network type is available
c.Lock()
dd, ok := c.drivers[n.networkType]
c.Unlock()
if !ok {
var err error
dd, err = c.loadDriver(n.networkType)
if err != nil {
return nil, err
}
}
n.Lock()
n.svcRecords = svcMap{}
n.driver = dd.driver
n.dataScope = dd.capability.DataScope
n.Unlock()
return dd.driver, nil
}

View File

@ -1,10 +1,6 @@
package driverapi
import (
"net"
"github.com/docker/libnetwork/datastore"
)
import "net"
// NetworkPluginEndpointType represents the Endpoint Type used by Plugin system
const NetworkPluginEndpointType = "NetworkDriver"
@ -105,7 +101,7 @@ type DriverCallback interface {
// Capability represents the high level capabilities of the drivers which libnetwork can make use of
type Capability struct {
DataScope datastore.DataScope
DataScope string
}
// DiscoveryType represents the type of discovery element the DiscoverNew function is invoked on

View File

@ -3,6 +3,7 @@ package libnetwork
import (
"strings"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/ipamapi"
builtinIpam "github.com/docker/libnetwork/ipams/builtin"
@ -32,9 +33,9 @@ func makeDriverConfig(c *controller, ntype string) map[string]interface{} {
config := make(map[string]interface{})
if c.validateGlobalStoreConfig() {
config[netlabel.KVProvider] = c.cfg.GlobalStore.Client.Provider
config[netlabel.KVProviderURL] = c.cfg.GlobalStore.Client.Address
if dcfg, ok := c.cfg.Scopes[datastore.GlobalScope]; ok && dcfg.IsValid() {
config[netlabel.KVProvider] = dcfg.Client.Provider
config[netlabel.KVProviderURL] = dcfg.Client.Address
}
for _, label := range c.cfg.Daemon.Labels {

View File

@ -43,12 +43,16 @@ func (d *driver) CreateEndpoint(nid, eid string, ifInfo driverapi.InterfaceInfo,
return err
}
// Since we perform lazy configuration make sure we try
// configuring the driver when we enter CreateEndpoint since
// CreateNetwork may not be called in every node.
if err := d.configure(); err != nil {
return err
}
n := d.network(nid)
if n == nil {
n, err = d.createNetworkfromStore(nid)
if err != nil {
return fmt.Errorf("network id %q not found", nid)
}
return fmt.Errorf("network id %q not found", nid)
}
ep := &endpoint{

View File

@ -45,12 +45,13 @@ type network struct {
}
func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Data, ipV6Data []driverapi.IPAMData) error {
var err error
if id == "" {
return fmt.Errorf("invalid network id")
}
if err = d.configure(); err != nil {
// Since we perform lazy configuration make sure we try
// configuring the driver when we enter CreateNetwork
if err := d.configure(); err != nil {
return err
}
@ -71,29 +72,16 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Dat
n.subnets = append(n.subnets, s)
}
for {
// If the datastore has the network object already
// there is no need to do a write.
err = d.store.GetObject(datastore.Key(n.Key()...), n)
if err == nil || err != datastore.ErrKeyNotFound {
break
}
err = n.writeToStore()
if err == nil || err != datastore.ErrKeyModified {
break
}
}
if err != nil {
if err := n.writeToStore(); err != nil {
return fmt.Errorf("failed to update data store for network %v: %v", n.id, err)
}
d.addNetwork(n)
return nil
}
func (d *driver) createNetworkfromStore(nid string) (*network, error) {
/* func (d *driver) createNetworkfromStore(nid string) (*network, error) {
n := &network{
id: nid,
driver: d,
@ -107,7 +95,7 @@ func (d *driver) createNetworkfromStore(nid string) (*network, error) {
return nil, fmt.Errorf("unable to get network %q from data store, %v", nid, err)
}
return n, nil
}
}*/
func (d *driver) DeleteNetwork(nid string) error {
if nid == "" {
@ -313,9 +301,34 @@ func (d *driver) deleteNetwork(nid string) {
func (d *driver) network(nid string) *network {
d.Lock()
defer d.Unlock()
networks := d.networks
d.Unlock()
return d.networks[nid]
n, ok := networks[nid]
if !ok {
n = d.getNetworkFromStore(nid)
if n != nil {
n.driver = d
n.endpoints = endpointTable{}
n.once = &sync.Once{}
networks[nid] = n
}
}
return n
}
func (d *driver) getNetworkFromStore(nid string) *network {
if d.store == nil {
return nil
}
n := &network{id: nid}
if err := d.store.GetObject(datastore.Key(n.Key()...), n); err != nil {
return nil
}
return n
}
func (n *network) sandbox() osl.Sandbox {
@ -408,30 +421,23 @@ func (n *network) SetValue(value []byte) error {
subnetIP, _ := types.ParseCIDR(subnetIPstr)
gwIP, _ := types.ParseCIDR(gwIPstr)
// If the network is being created by reading from the
// datastore subnets have to created. If the network
// already exists update only the subnets' vni field
if len(n.subnets) == 0 {
s := &subnet{
subnetIP: subnetIP,
gwIP: gwIP,
vni: vni,
once: &sync.Once{},
}
n.subnets = append(n.subnets, s)
return nil
s := &subnet{
subnetIP: subnetIP,
gwIP: gwIP,
vni: vni,
once: &sync.Once{},
}
n.subnets = append(n.subnets, s)
sNet := n.getMatchingSubnet(subnetIP)
if sNet != nil {
if vni != 0 {
sNet.vni = vni
}
sNet.vni = vni
}
return nil
}
func (n *network) DataScope() datastore.DataScope {
func (n *network) DataScope() string {
return datastore.GlobalScope
}

View File

@ -6,7 +6,6 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/libkv/store"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/idm"
@ -84,8 +83,8 @@ func (d *driver) configure() error {
provURL, urlOk := d.config[netlabel.KVProviderURL]
if provOk && urlOk {
cfg := &config.DatastoreCfg{
Client: config.DatastoreClientCfg{
cfg := &datastore.ScopeCfg{
Client: datastore.ScopeClientCfg{
Provider: provider.(string),
Address: provURL.(string),
},
@ -94,7 +93,7 @@ func (d *driver) configure() error {
if confOk {
cfg.Client.Config = provConfig.(*store.Config)
}
d.store, err = datastore.NewDataStore(cfg)
d.store, err = datastore.NewDataStore(datastore.GlobalScope, cfg)
if err != nil {
err = fmt.Errorf("failed to initialize data store: %v", err)
return

View File

@ -12,6 +12,7 @@ import (
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/ipamapi"
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/options"
"github.com/docker/libnetwork/types"
)
@ -107,6 +108,37 @@ func (ep *endpoint) UnmarshalJSON(b []byte) (err error) {
return nil
}
func (ep *endpoint) New() datastore.KVObject {
return &endpoint{network: ep.getNetwork()}
}
func (ep *endpoint) CopyTo(o datastore.KVObject) error {
ep.Lock()
defer ep.Unlock()
dstEp := o.(*endpoint)
dstEp.name = ep.name
dstEp.id = ep.id
dstEp.sandboxID = ep.sandboxID
dstEp.dbIndex = ep.dbIndex
dstEp.dbExists = ep.dbExists
if ep.iface != nil {
dstEp.iface = &endpointInterface{}
ep.iface.CopyTo(dstEp.iface)
}
dstEp.exposedPorts = make([]types.TransportPort, len(ep.exposedPorts))
copy(dstEp.exposedPorts, ep.exposedPorts)
dstEp.generic = options.Generic{}
for k, v := range ep.generic {
dstEp.generic[k] = v
}
return nil
}
func (ep *endpoint) ID() string {
ep.Lock()
defer ep.Unlock()
@ -122,16 +154,28 @@ func (ep *endpoint) Name() string {
}
func (ep *endpoint) Network() string {
return ep.getNetwork().name
if ep.network == nil {
return ""
}
return ep.network.name
}
// endpoint Key structure : endpoint/network-id/endpoint-id
func (ep *endpoint) Key() []string {
return []string{datastore.EndpointKeyPrefix, ep.getNetwork().id, ep.id}
if ep.network == nil {
return nil
}
return []string{datastore.EndpointKeyPrefix, ep.network.id, ep.id}
}
func (ep *endpoint) KeyPrefix() []string {
return []string{datastore.EndpointKeyPrefix, ep.getNetwork().id}
if ep.network == nil {
return nil
}
return []string{datastore.EndpointKeyPrefix, ep.network.id}
}
func (ep *endpoint) networkIDFromKey(key string) (string, error) {
@ -177,7 +221,7 @@ func (ep *endpoint) Exists() bool {
}
func (ep *endpoint) Skip() bool {
return ep.getNetwork().Skip()
return ep.getNetwork().Skip() || ep.DataScope() == datastore.LocalScope
}
func (ep *endpoint) processOptions(options ...EndpointOption) {
@ -191,8 +235,22 @@ func (ep *endpoint) processOptions(options ...EndpointOption) {
}
}
func (ep *endpoint) Join(sbox Sandbox, options ...EndpointOption) error {
func (ep *endpoint) getNetwork() *network {
ep.Lock()
defer ep.Unlock()
return ep.network
}
func (ep *endpoint) getNetworkFromStore() (*network, error) {
if ep.network == nil {
return nil, fmt.Errorf("invalid network object in endpoint %s", ep.Name())
}
return ep.network.ctrlr.getNetworkFromStore(ep.network.id)
}
func (ep *endpoint) Join(sbox Sandbox, options ...EndpointOption) error {
if sbox == nil {
return types.BadRequestErrorf("endpoint cannot be joined by nil container")
}
@ -215,15 +273,27 @@ func (ep *endpoint) sbJoin(sbox Sandbox, options ...EndpointOption) error {
return types.BadRequestErrorf("not a valid Sandbox interface")
}
network, err := ep.getNetworkFromStore()
if err != nil {
return fmt.Errorf("failed to get network from store during join: %v", err)
}
ep, err = network.getEndpointFromStore(ep.ID())
if err != nil {
return fmt.Errorf("failed to get endpoint from store during join: %v", err)
}
ep.Lock()
if ep.sandboxID != "" {
ep.Unlock()
return types.ForbiddenErrorf("a sandbox has already joined the endpoint")
}
ep.Unlock()
ep.Lock()
ep.network = network
ep.sandboxID = sbox.ID()
ep.joinInfo = &endpointJoinInfo{}
network := ep.network
epid := ep.id
ep.Unlock()
defer func() {
@ -235,12 +305,16 @@ func (ep *endpoint) sbJoin(sbox Sandbox, options ...EndpointOption) error {
}()
network.Lock()
driver := network.driver
nid := network.id
network.Unlock()
ep.processOptions(options...)
driver, err := network.driver()
if err != nil {
return fmt.Errorf("failed to join endpoint: %v", err)
}
err = driver.Join(nid, epid, sbox.Key(), ep, sbox.Labels())
if err != nil {
return err
@ -262,14 +336,15 @@ func (ep *endpoint) sbJoin(sbox Sandbox, options ...EndpointOption) error {
return err
}
if err = sb.updateDNS(ep.getNetwork().enableIPv6); err != nil {
// Watch for service records
network.getController().watchSvcRecord(ep)
if err = sb.updateDNS(network.enableIPv6); err != nil {
return err
}
if !ep.isLocalScoped() {
if err = network.ctrlr.updateToStore(ep); err != nil {
return err
}
if err = network.getController().updateToStore(ep); err != nil {
return err
}
sb.Lock()
@ -327,6 +402,16 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error {
return types.BadRequestErrorf("not a valid Sandbox interface")
}
n, err := ep.getNetworkFromStore()
if err != nil {
return fmt.Errorf("failed to get network from store during leave: %v", err)
}
ep, err = n.getEndpointFromStore(ep.ID())
if err != nil {
return fmt.Errorf("failed to get endpoint from store during leave: %v", err)
}
ep.Lock()
sid := ep.sandboxID
ep.Unlock()
@ -342,21 +427,19 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error {
ep.Lock()
ep.sandboxID = ""
n := ep.network
ep.network = n
ep.Unlock()
n.Lock()
c := n.ctrlr
d := n.driver
n.Unlock()
if err := n.getController().updateToStore(ep); err != nil {
ep.Lock()
ep.sandboxID = sid
ep.Unlock()
return err
}
if !ep.isLocalScoped() {
if err := c.updateToStore(ep); err != nil {
ep.Lock()
ep.sandboxID = sid
ep.Unlock()
return err
}
d, err := n.driver()
if err != nil {
return fmt.Errorf("failed to leave endpoint: %v", err)
}
if err := d.Leave(n.id, ep.id); err != nil {
@ -367,6 +450,9 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error {
return err
}
// unwatch for service records
n.getController().unWatchSvcRecord(ep)
if sb.needDefaultGW() {
ep := sb.getEPwithoutGateway()
if ep == nil {
@ -379,49 +465,48 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error {
func (ep *endpoint) Delete() error {
var err error
n, err := ep.getNetworkFromStore()
if err != nil {
return fmt.Errorf("failed to get network during Delete: %v", err)
}
ep, err = n.getEndpointFromStore(ep.ID())
if err != nil {
return fmt.Errorf("failed to get endpoint from store during Delete: %v", err)
}
ep.Lock()
epid := ep.id
name := ep.name
n := ep.network
if ep.sandboxID != "" {
ep.Unlock()
return &ActiveContainerError{name: name, id: epid}
}
n.Lock()
ctrlr := n.ctrlr
n.Unlock()
ep.Unlock()
if !ep.isLocalScoped() {
if err = ctrlr.deleteFromStore(ep); err != nil {
return err
}
}
defer func() {
if err != nil {
ep.dbExists = false
if !ep.isLocalScoped() {
if e := ctrlr.updateToStore(ep); e != nil {
log.Warnf("failed to recreate endpoint in store %s : %v", name, e)
}
}
}
}()
// Update the endpoint count in network and update it in the datastore
n.DecEndpointCnt()
if err = ctrlr.updateToStore(n); err != nil {
if err = n.DecEndpointCnt(); err != nil {
return err
}
defer func() {
if err != nil {
n.IncEndpointCnt()
if e := ctrlr.updateToStore(n); e != nil {
if e := n.IncEndpointCnt(); e != nil {
log.Warnf("failed to update network %s : %v", n.name, e)
}
}
}()
if err = n.getController().deleteFromStore(ep); err != nil {
return err
}
defer func() {
if err != nil {
ep.dbExists = false
if e := n.getController().updateToStore(ep); e != nil {
log.Warnf("failed to recreate endpoint in store %s : %v", name, e)
}
}
}()
if err = ep.deleteEndpoint(); err != nil {
return err
}
@ -438,38 +523,21 @@ func (ep *endpoint) deleteEndpoint() error {
epid := ep.id
ep.Unlock()
n.Lock()
_, ok := n.endpoints[epid]
if !ok {
n.Unlock()
return nil
driver, err := n.driver()
if err != nil {
return fmt.Errorf("failed to delete endpoint: %v", err)
}
nid := n.id
driver := n.driver
delete(n.endpoints, epid)
n.Unlock()
if err := driver.DeleteEndpoint(nid, epid); err != nil {
if err := driver.DeleteEndpoint(n.id, epid); err != nil {
if _, ok := err.(types.ForbiddenError); ok {
n.Lock()
n.endpoints[epid] = ep
n.Unlock()
return err
}
log.Warnf("driver error deleting endpoint %s : %v", name, err)
}
n.updateSvcRecord(ep, false)
return nil
}
func (ep *endpoint) getNetwork() *network {
ep.Lock()
defer ep.Unlock()
return ep.network
}
func (ep *endpoint) getSandbox() (*sandbox, bool) {
ep.Lock()
c := ep.network.getController()
@ -545,14 +613,8 @@ func JoinOptionPriority(ep Endpoint, prio int) EndpointOption {
}
}
func (ep *endpoint) DataScope() datastore.DataScope {
ep.Lock()
defer ep.Unlock()
return ep.network.dataScope
}
func (ep *endpoint) isLocalScoped() bool {
return ep.DataScope() == datastore.LocalScope
func (ep *endpoint) DataScope() string {
return ep.getNetwork().DataScope()
}
func (ep *endpoint) assignAddress() error {

View File

@ -2,6 +2,7 @@ package libnetwork
import (
"encoding/json"
"fmt"
"net"
"github.com/docker/libnetwork/driverapi"
@ -115,6 +116,21 @@ func (epi *endpointInterface) UnmarshalJSON(b []byte) error {
return nil
}
func (epi *endpointInterface) CopyTo(dstEpi *endpointInterface) error {
dstEpi.mac = types.GetMacCopy(epi.mac)
dstEpi.addr = types.GetIPNetCopy(epi.addr)
dstEpi.addrv6 = types.GetIPNetCopy(epi.addrv6)
dstEpi.srcName = epi.srcName
dstEpi.dstPrefix = epi.dstPrefix
dstEpi.poolID = epi.poolID
for _, route := range epi.routes {
dstEpi.routes = append(dstEpi.routes, types.GetIPNetCopy(route))
}
return nil
}
type endpointJoinInfo struct {
gw net.IP
gw6 net.IP
@ -122,21 +138,38 @@ type endpointJoinInfo struct {
}
func (ep *endpoint) Info() EndpointInfo {
return ep
n, err := ep.getNetworkFromStore()
if err != nil {
return nil
}
ep, err = n.getEndpointFromStore(ep.ID())
if err != nil {
return nil
}
sb, ok := ep.getSandbox()
if !ok {
// endpoint hasn't joined any sandbox.
// Just return the endpoint
return ep
}
return sb.getEndpoint(ep.ID())
}
func (ep *endpoint) DriverInfo() (map[string]interface{}, error) {
ep.Lock()
network := ep.network
epid := ep.id
ep.Unlock()
n, err := ep.getNetworkFromStore()
if err != nil {
return nil, fmt.Errorf("could not find network in store for driver info: %v", err)
}
network.Lock()
driver := network.driver
nid := network.id
network.Unlock()
driver, err := n.driver()
if err != nil {
return nil, fmt.Errorf("failed to get driver info: %v", err)
}
return driver.EndpointOperInfo(nid, epid)
return driver.EndpointOperInfo(n.ID(), ep.ID())
}
func (ep *endpoint) Iface() InterfaceInfo {

View File

@ -6,7 +6,6 @@ import (
"net"
"testing"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/types"
@ -32,11 +31,6 @@ func TestDriverRegistration(t *testing.T) {
}
}
func SetTestDataStore(c NetworkController, custom datastore.DataStore) {
con := c.(*controller)
con.globalStore = custom
}
func TestNetworkMarshalling(t *testing.T) {
n := &network{
name: "Miao",

View File

@ -50,7 +50,7 @@ func TestMain(m *testing.M) {
os.Exit(1)
}
libnetwork.SetTestDataStore(controller, datastore.NewCustomDataStore(datastore.NewMockStore()))
//libnetwork.SetTestDataStore(controller, datastore.NewCustomDataStore(datastore.NewMockStore()))
x := m.Run()
controller.Stop()
@ -60,6 +60,9 @@ func TestMain(m *testing.M) {
func createController() error {
var err error
// Cleanup local datastore file
os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
option := options.Generic{
"EnableIPForwarding": true,
}
@ -358,27 +361,6 @@ func TestNilRemoteDriver(t *testing.T) {
}
}
func TestDuplicateNetwork(t *testing.T) {
if !testutils.IsRunningInContainer() {
defer testutils.SetupTestOSContext(t)()
}
// Creating a default bridge name network (can't be removed)
_, err := controller.NewNetwork(bridgeNetType, "testdup")
if err != nil {
t.Fatal(err)
}
_, err = controller.NewNetwork(bridgeNetType, "testdup")
if err == nil {
t.Fatal("Expected to fail. But instead succeeded")
}
if _, ok := err.(libnetwork.NetworkNameError); !ok {
t.Fatalf("Did not fail with expected error. Actual error: %v", err)
}
}
func TestNetworkName(t *testing.T) {
if !testutils.IsRunningInContainer() {
defer testutils.SetupTestOSContext(t)()
@ -703,7 +685,7 @@ func TestNetworkEndpointsWalkers(t *testing.T) {
if netWanted == nil {
t.Fatal(err)
}
if net1 != netWanted {
if net1.ID() != netWanted.ID() {
t.Fatal(err)
}
@ -712,7 +694,7 @@ func TestNetworkEndpointsWalkers(t *testing.T) {
if netWanted == nil {
t.Fatal(err)
}
if net2 != netWanted {
if net2.ID() != netWanted.ID() {
t.Fatal(err)
}
}
@ -843,7 +825,7 @@ func TestControllerQuery(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected failure for NetworkByID(): %v", err)
}
if net1 != g {
if net1.ID() != g.ID() {
t.Fatalf("NetworkByID() returned unexpected element: %v", g)
}
@ -863,7 +845,7 @@ func TestControllerQuery(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected failure for NetworkByID(): %v", err)
}
if net2 != g {
if net2.ID() != g.ID() {
t.Fatalf("NetworkByID() returned unexpected element: %v", g)
}
}
@ -940,7 +922,7 @@ func TestNetworkQuery(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if ep12 != e {
if ep12.ID() != e.ID() {
t.Fatalf("EndpointByID() returned %v instead of %v", e, ep12)
}

View File

@ -2,6 +2,7 @@ package libnetwork
import (
"encoding/json"
"fmt"
"net"
"sync"
@ -127,7 +128,6 @@ type network struct {
networkType string
id string
ipamType string
driver driverapi.Driver
addrSpace string
ipamV4Config []*IpamConf
ipamV6Config []*IpamConf
@ -135,14 +135,14 @@ type network struct {
ipamV6Info []*IpamInfo
enableIPv6 bool
endpointCnt uint64
endpoints endpointTable
generic options.Generic
dbIndex uint64
svcRecords svcMap
dbExists bool
persist bool
stopWatchCh chan struct{}
dataScope datastore.DataScope
scope string
drvOnce *sync.Once
sync.Mutex
}
@ -164,11 +164,7 @@ func (n *network) Type() string {
n.Lock()
defer n.Unlock()
if n.driver == nil {
return ""
}
return n.driver.Type()
return n.networkType
}
func (n *network) Key() []string {
@ -220,10 +216,72 @@ func (n *network) Skip() bool {
return !n.persist
}
func (n *network) DataScope() datastore.DataScope {
func (n *network) New() datastore.KVObject {
n.Lock()
defer n.Unlock()
return n.dataScope
return &network{
ctrlr: n.ctrlr,
drvOnce: &sync.Once{},
}
}
// CopyTo deep copies to the destination IpamInfo
func (i *IpamInfo) CopyTo(dstI *IpamInfo) error {
dstI.PoolID = i.PoolID
if i.Meta != nil {
dstI.Meta = make(map[string]string)
for k, v := range i.Meta {
dstI.Meta[k] = v
}
}
dstI.AddressSpace = i.AddressSpace
dstI.Pool = types.GetIPNetCopy(i.Pool)
dstI.Gateway = types.GetIPNetCopy(i.Gateway)
if i.AuxAddresses != nil {
dstI.AuxAddresses = make(map[string]*net.IPNet)
for k, v := range i.AuxAddresses {
dstI.AuxAddresses[k] = types.GetIPNetCopy(v)
}
}
return nil
}
func (n *network) CopyTo(o datastore.KVObject) error {
n.Lock()
defer n.Unlock()
dstN := o.(*network)
dstN.name = n.name
dstN.id = n.id
dstN.networkType = n.networkType
dstN.ipamType = n.ipamType
dstN.endpointCnt = n.endpointCnt
dstN.enableIPv6 = n.enableIPv6
dstN.persist = n.persist
dstN.dbIndex = n.dbIndex
dstN.dbExists = n.dbExists
dstN.drvOnce = n.drvOnce
for _, v4info := range n.ipamV4Info {
dstV4Info := &IpamInfo{}
v4info.CopyTo(dstV4Info)
dstN.ipamV4Info = append(dstN.ipamV4Info, dstV4Info)
}
dstN.generic = options.Generic{}
for k, v := range n.generic {
dstN.generic[k] = v
}
return nil
}
func (n *network) DataScope() string {
return n.driverScope()
}
func (n *network) EndpointCnt() uint64 {
@ -232,16 +290,20 @@ func (n *network) EndpointCnt() uint64 {
return n.endpointCnt
}
func (n *network) IncEndpointCnt() {
func (n *network) IncEndpointCnt() error {
n.Lock()
n.endpointCnt++
n.Unlock()
return n.getController().updateToStore(n)
}
func (n *network) DecEndpointCnt() {
func (n *network) DecEndpointCnt() error {
n.Lock()
n.endpointCnt--
n.Unlock()
return n.getController().updateToStore(n)
}
// TODO : Can be made much more generic with the help of reflection (but has some golang limitations)
@ -372,17 +434,55 @@ func (n *network) processOptions(options ...NetworkOption) {
}
}
func (n *network) Delete() error {
var err error
func (n *network) driverScope() string {
c := n.getController()
ctrlr := n.getController()
ctrlr.Lock()
_, ok := ctrlr.networks[n.id]
ctrlr.Unlock()
c.Lock()
// Check if a driver for the specified network type is available
dd, ok := c.drivers[n.networkType]
c.Unlock()
if !ok {
return &UnknownNetworkError{name: n.name, id: n.id}
var err error
dd, err = c.loadDriver(n.networkType)
if err != nil {
// If driver could not be resolved simply return an empty string
return ""
}
}
return dd.capability.DataScope
}
func (n *network) driver() (driverapi.Driver, error) {
c := n.getController()
c.Lock()
// Check if a driver for the specified network type is available
dd, ok := c.drivers[n.networkType]
c.Unlock()
if !ok {
var err error
dd, err = c.loadDriver(n.networkType)
if err != nil {
return nil, err
}
}
return dd.driver, nil
}
func (n *network) Delete() error {
n.Lock()
c := n.ctrlr
name := n.name
id := n.id
n.Unlock()
n, err := c.getNetworkFromStore(id)
if err != nil {
return &UnknownNetworkError{name: name, id: id}
}
numEps := n.EndpointCnt()
@ -390,9 +490,22 @@ func (n *network) Delete() error {
return &ActiveEndpointsError{name: n.name, id: n.id}
}
// deleteNetworkFromStore performs an atomic delete operation and the network.endpointCnt field will help
// prevent any possible race between endpoint join and network delete
if err = ctrlr.deleteFromStore(n); err != nil {
if err = n.deleteNetwork(); err != nil {
return err
}
defer func() {
if err != nil {
if e := c.addNetwork(n); e != nil {
log.Warnf("failed to rollback deleteNetwork for network %s: %v",
n.Name(), err)
}
}
}()
// deleteFromStore performs an atomic delete operation and the
// network.endpointCnt field will help prevent any possible
// race between endpoint join and network delete
if err = n.getController().deleteFromStore(n); err != nil {
if err == datastore.ErrKeyModified {
return types.InternalErrorf("operation in progress. delete failed for network %s. Please try again.")
}
@ -402,65 +515,68 @@ func (n *network) Delete() error {
defer func() {
if err != nil {
n.dbExists = false
if e := ctrlr.updateToStore(n); e != nil {
if e := n.getController().updateToStore(n); e != nil {
log.Warnf("failed to recreate network in store %s : %v", n.name, e)
}
}
}()
if err = n.deleteNetwork(); err != nil {
return err
}
n.ipamRelease()
return nil
}
func (n *network) deleteNetwork() error {
n.Lock()
id := n.id
d := n.driver
n.ctrlr.Lock()
delete(n.ctrlr.networks, id)
n.ctrlr.Unlock()
n.Unlock()
d, err := n.driver()
if err != nil {
return fmt.Errorf("failed deleting network: %v", err)
}
if err := d.DeleteNetwork(n.id); err != nil {
// If it is bridge network type make sure we call the driver about the network
// because the network may have been created in some past life of libnetwork.
if n.Type() == "bridge" {
n.drvOnce.Do(func() {
err = n.getController().addNetwork(n)
})
if err != nil {
return err
}
}
if err := d.DeleteNetwork(n.ID()); err != nil {
// Forbidden Errors should be honored
if _, ok := err.(types.ForbiddenError); ok {
n.ctrlr.Lock()
n.ctrlr.networks[n.id] = n
n.ctrlr.Unlock()
return err
}
log.Warnf("driver error deleting network %s : %v", n.name, err)
}
n.stopWatch()
return nil
}
func (n *network) addEndpoint(ep *endpoint) error {
var err error
n.Lock()
n.endpoints[ep.id] = ep
d := n.driver
n.Unlock()
d, err := n.driver()
if err != nil {
return fmt.Errorf("failed to add endpoint: %v", err)
}
defer func() {
// If it is bridge network type make sure we call the driver about the network
// because the network may have been created in some past life of libnetwork.
if n.Type() == "bridge" {
n.drvOnce.Do(func() {
err = n.getController().addNetwork(n)
})
if err != nil {
n.Lock()
delete(n.endpoints, ep.id)
n.Unlock()
return err
}
}()
}
err = d.CreateEndpoint(n.id, ep.id, ep.Interface(), ep.generic)
if err != nil {
return types.InternalErrorf("failed to create endpoint %s on network %s: %v", ep.Name(), n.Name(), err)
return types.InternalErrorf("failed to create endpoint %s on network %s: %v",
ep.Name(), n.Name(), err)
}
n.updateSvcRecord(ep, true)
return nil
}
@ -476,7 +592,16 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
ep := &endpoint{name: name, generic: make(map[string]interface{}), iface: &endpointInterface{}}
ep.id = stringid.GenerateRandomID()
// Initialize ep.network with a possibly stale copy of n. We need this to get network from
// store. But once we get it from store we will have the most uptodate copy possible.
ep.network = n
ep.network, err = ep.getNetworkFromStore()
if err != nil {
return nil, fmt.Errorf("failed to get network during CreateEndpoint: %v", err)
}
n = ep.network
ep.processOptions(options...)
if err = ep.assignAddress(); err != nil {
@ -488,46 +613,46 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
}
}()
ctrlr := n.getController()
n.IncEndpointCnt()
if err = ctrlr.updateToStore(n); err != nil {
return nil, err
}
defer func() {
if err != nil {
n.DecEndpointCnt()
if err = ctrlr.updateToStore(n); err != nil {
log.Warnf("endpoint count cleanup failed when updating network for %s : %v", name, err)
}
}
}()
if err = n.addEndpoint(ep); err != nil {
return nil, err
}
defer func() {
if err != nil {
if e := ep.Delete(); ep != nil {
if e := ep.deleteEndpoint(); e != nil {
log.Warnf("cleaning up endpoint failed %s : %v", name, e)
}
}
}()
if !ep.isLocalScoped() {
if err = ctrlr.updateToStore(ep); err != nil {
return nil, err
if err = n.getController().updateToStore(ep); err != nil {
return nil, err
}
defer func() {
if err != nil {
if e := n.getController().deleteFromStore(ep); e != nil {
log.Warnf("error rolling back endpoint %s from store: %v", name, e)
}
}
}()
// Increment endpoint count to indicate completion of endpoint addition
if err = n.IncEndpointCnt(); err != nil {
return nil, err
}
return ep, nil
}
func (n *network) Endpoints() []Endpoint {
n.Lock()
defer n.Unlock()
list := make([]Endpoint, 0, len(n.endpoints))
for _, e := range n.endpoints {
list = append(list, e)
var list []Endpoint
endpoints, err := n.getEndpointsFromStore()
if err != nil {
log.Error(err)
}
for _, ep := range endpoints {
list = append(list, ep)
}
return list
@ -568,28 +693,32 @@ func (n *network) EndpointByID(id string) (Endpoint, error) {
if id == "" {
return nil, ErrInvalidID(id)
}
n.Lock()
defer n.Unlock()
if e, ok := n.endpoints[id]; ok {
return e, nil
ep, err := n.getEndpointFromStore(id)
if err != nil {
return nil, ErrNoSuchEndpoint(id)
}
return nil, ErrNoSuchEndpoint(id)
return ep, nil
}
func (n *network) isGlobalScoped() bool {
return n.DataScope() == datastore.GlobalScope
}
func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool) {
c := n.getController()
sr, ok := c.svcDb[n.ID()]
if !ok {
c.svcDb[n.ID()] = svcMap{}
sr = c.svcDb[n.ID()]
}
func (n *network) updateSvcRecord(ep *endpoint, isAdd bool) {
n.Lock()
var recs []etchosts.Record
if iface := ep.Iface(); iface.Address() != nil {
if isAdd {
n.svcRecords[ep.Name()] = iface.Address().IP
n.svcRecords[ep.Name()+"."+n.name] = iface.Address().IP
sr[ep.Name()] = iface.Address().IP
sr[ep.Name()+"."+n.name] = iface.Address().IP
} else {
delete(n.svcRecords, ep.Name())
delete(n.svcRecords, ep.Name()+"."+n.name)
delete(sr, ep.Name())
delete(sr, ep.Name()+"."+n.name)
}
recs = append(recs, etchosts.Record{
@ -610,12 +739,11 @@ func (n *network) updateSvcRecord(ep *endpoint, isAdd bool) {
}
var sbList []*sandbox
n.WalkEndpoints(func(e Endpoint) bool {
if sb, hasSandbox := e.(*endpoint).getSandbox(); hasSandbox {
for _, ep := range localEps {
if sb, hasSandbox := ep.getSandbox(); hasSandbox {
sbList = append(sbList, sb)
}
return false
})
}
for _, sb := range sbList {
if isAdd {
@ -631,7 +759,9 @@ func (n *network) getSvcRecords() []etchosts.Record {
defer n.Unlock()
var recs []etchosts.Record
for h, ip := range n.svcRecords {
sr, _ := n.ctrlr.svcDb[n.id]
for h, ip := range sr {
recs = append(recs, etchosts.Record{
Hosts: h,
IP: ip.String(),
@ -799,7 +929,7 @@ func (n *network) deriveAddressSpace() (string, error) {
if !ok {
return "", types.NotFoundErrorf("could not find ipam driver %s to get default address space", n.ipamType)
}
if n.isGlobalScoped() {
if n.DataScope() == datastore.GlobalScope {
return ipd.defaultGlobalAddressSpace, nil
}
return ipd.defaultLocalAddressSpace, nil

View File

@ -247,6 +247,19 @@ func (sb *sandbox) getConnectedEndpoints() []*endpoint {
return eps
}
func (sb *sandbox) getEndpoint(id string) *endpoint {
sb.Lock()
defer sb.Unlock()
for _, ep := range sb.endpoints {
if ep.id == id {
return ep
}
}
return nil
}
func (sb *sandbox) updateGateway(ep *endpoint) error {
sb.Lock()
osSbox := sb.osSbox
@ -359,7 +372,13 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
return nil
}
func (sb *sandbox) clearNetworkResources(ep *endpoint) error {
func (sb *sandbox) clearNetworkResources(origEp *endpoint) error {
ep := sb.getEndpoint(origEp.id)
if ep == nil {
return fmt.Errorf("could not find the sandbox endpoint data for endpoint %s",
ep.name)
}
sb.Lock()
osSbox := sb.osSbox
sb.Unlock()
@ -837,7 +856,7 @@ func (eh epHeap) Less(i, j int) bool {
cjp = 0
}
if cip == cjp {
return eh[i].getNetwork().Name() < eh[j].getNetwork().Name()
return eh[i].network.Name() < eh[j].network.Name()
}
return cip > cjp

View File

@ -115,21 +115,21 @@ func TestSandboxAddMultiPrio(t *testing.T) {
t.Fatal(err)
}
if ctrlr.sandboxes[sid].endpoints[0] != ep3 {
if ctrlr.sandboxes[sid].endpoints[0].ID() != ep3.ID() {
t.Fatal("Expected ep3 to be at the top of the heap. But did not find ep3 at the top of the heap")
}
if err := ep3.Leave(sbx); err != nil {
t.Fatal(err)
}
if ctrlr.sandboxes[sid].endpoints[0] != ep2 {
if ctrlr.sandboxes[sid].endpoints[0].ID() != ep2.ID() {
t.Fatal("Expected ep2 to be at the top of the heap after removing ep3. But did not find ep2 at the top of the heap")
}
if err := ep2.Leave(sbx); err != nil {
t.Fatal(err)
}
if ctrlr.sandboxes[sid].endpoints[0] != ep1 {
if ctrlr.sandboxes[sid].endpoints[0].ID() != ep1.ID() {
t.Fatal("Expected ep1 to be at the top of the heap after removing ep2. But did not find ep1 at the top of the heap")
}
@ -138,7 +138,7 @@ func TestSandboxAddMultiPrio(t *testing.T) {
t.Fatal(err)
}
if ctrlr.sandboxes[sid].endpoints[0] != ep3 {
if ctrlr.sandboxes[sid].endpoints[0].ID() != ep3.ID() {
t.Fatal("Expected ep3 to be at the top of the heap after adding ep3 back. But did not find ep3 at the top of the heap")
}
@ -185,7 +185,7 @@ func TestSandboxAddSamePrio(t *testing.T) {
t.Fatal(err)
}
if ctrlr.sandboxes[sid].endpoints[0] != ep1 {
if ctrlr.sandboxes[sid].endpoints[0].ID() != ep1.ID() {
t.Fatal("Expected ep1 to be at the top of the heap. But did not find ep1 at the top of the heap")
}
@ -193,7 +193,7 @@ func TestSandboxAddSamePrio(t *testing.T) {
t.Fatal(err)
}
if ctrlr.sandboxes[sid].endpoints[0] != ep2 {
if ctrlr.sandboxes[sid].endpoints[0].ID() != ep2.ID() {
t.Fatal("Expected ep2 to be at the top of the heap after removing ep3. But did not find ep2 at the top of the heap")
}

View File

@ -1,408 +1,348 @@
package libnetwork
import (
"encoding/json"
"fmt"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/libkv/store"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/datastore"
)
var (
defaultBoltTimeout = 3 * time.Second
defaultLocalStoreConfig = config.DatastoreCfg{
Embedded: true,
Client: config.DatastoreClientCfg{
Provider: "boltdb",
Address: defaultPrefix + "/boltdb.db",
Config: &store.Config{
Bucket: "libnetwork",
ConnectionTimeout: defaultBoltTimeout,
},
},
}
)
func (c *controller) validateGlobalStoreConfig() bool {
return c.cfg != nil && c.cfg.GlobalStore.Client.Provider != "" && c.cfg.GlobalStore.Client.Address != ""
}
func (c *controller) initGlobalStore() error {
func (c *controller) initStores() error {
c.Lock()
cfg := c.cfg
c.Unlock()
if !c.validateGlobalStoreConfig() {
return fmt.Errorf("globalstore initialization requires a valid configuration")
}
store, err := datastore.NewDataStore(&cfg.GlobalStore)
if err != nil {
return err
}
c.Lock()
c.globalStore = store
c.Unlock()
return nil
}
func (c *controller) initLocalStore() error {
c.Lock()
cfg := c.cfg
c.Unlock()
localStore, err := datastore.NewDataStore(c.getLocalStoreConfig(cfg))
if err != nil {
return err
}
c.Lock()
c.localStore = localStore
c.Unlock()
return nil
}
func (c *controller) restoreFromGlobalStore() error {
c.Lock()
s := c.globalStore
c.Unlock()
if s == nil {
return nil
}
c.restore("global")
return c.watchNetworks()
}
func (c *controller) restoreFromLocalStore() error {
c.Lock()
s := c.localStore
c.Unlock()
if s != nil {
c.restore("local")
}
return nil
}
func (c *controller) restore(store string) {
nws, err := c.getNetworksFromStore(store == "global")
if err == nil {
c.processNetworkUpdate(nws, nil)
} else if err != datastore.ErrKeyNotFound {
log.Warnf("failed to read networks from %s store during init : %v", store, err)
}
}
func (c *controller) getNetworksFromStore(global bool) ([]*store.KVPair, error) {
var cs datastore.DataStore
c.Lock()
if global {
cs = c.globalStore
} else {
cs = c.localStore
}
c.Unlock()
return cs.KVStore().List(datastore.Key(datastore.NetworkKeyPrefix))
}
func (c *controller) newNetworkFromStore(n *network) error {
n.Lock()
n.ctrlr = c
n.endpoints = endpointTable{}
n.Unlock()
return c.addNetwork(n)
}
func (c *controller) newEndpointFromStore(key string, ep *endpoint) error {
ep.Lock()
n := ep.network
id := ep.id
ep.Unlock()
_, err := n.EndpointByID(id)
if err != nil {
if _, ok := err.(ErrNoSuchEndpoint); ok {
return n.addEndpoint(ep)
}
}
return err
}
func (c *controller) updateToStore(kvObject datastore.KV) error {
if kvObject.Skip() {
return nil
}
cs := c.getDataStore(kvObject.DataScope())
if cs == nil {
log.Debugf("datastore not initialized. kv object %s is not added to the store", datastore.Key(kvObject.Key()...))
return nil
}
return cs.PutObjectAtomic(kvObject)
}
func (c *controller) deleteFromStore(kvObject datastore.KV) error {
if kvObject.Skip() {
return nil
}
cs := c.getDataStore(kvObject.DataScope())
if cs == nil {
log.Debugf("datastore not initialized. kv object %s is not deleted from datastore", datastore.Key(kvObject.Key()...))
return nil
}
if err := cs.DeleteObjectAtomic(kvObject); err != nil {
return err
}
return nil
}
func (c *controller) watchNetworks() error {
if !c.validateGlobalStoreConfig() {
return nil
}
c.Lock()
cs := c.globalStore
c.Unlock()
networkKey := datastore.Key(datastore.NetworkKeyPrefix)
if err := ensureKeys(networkKey, cs); err != nil {
return fmt.Errorf("failed to ensure if the network keys are valid and present in store: %v", err)
}
nwPairs, err := cs.KVStore().WatchTree(networkKey, nil)
if err != nil {
return err
}
go func() {
for {
select {
case nws := <-nwPairs:
c.Lock()
tmpview := networkTable{}
lview := c.networks
c.Unlock()
for k, v := range lview {
if v.isGlobalScoped() {
tmpview[k] = v
}
}
c.processNetworkUpdate(nws, &tmpview)
// Delete processing
for k := range tmpview {
c.Lock()
existing, ok := c.networks[k]
c.Unlock()
if !ok {
continue
}
tmp := network{}
if err := c.globalStore.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound {
continue
}
if err := existing.deleteNetwork(); err != nil {
log.Debugf("Delete failed %s: %s", existing.name, err)
}
}
}
}
}()
return nil
}
func (n *network) watchEndpoints() error {
if n.Skip() || !n.ctrlr.validateGlobalStoreConfig() {
return nil
}
n.Lock()
cs := n.ctrlr.globalStore
tmp := endpoint{network: n}
n.stopWatchCh = make(chan struct{})
stopCh := n.stopWatchCh
n.Unlock()
endpointKey := datastore.Key(tmp.KeyPrefix()...)
if err := ensureKeys(endpointKey, cs); err != nil {
return fmt.Errorf("failed to ensure if the endpoint keys are valid and present in store: %v", err)
}
epPairs, err := cs.KVStore().WatchTree(endpointKey, stopCh)
if err != nil {
return err
}
go func() {
for {
select {
case <-stopCh:
return
case eps := <-epPairs:
n.Lock()
tmpview := endpointTable{}
lview := n.endpoints
n.Unlock()
for k, v := range lview {
if v.network.isGlobalScoped() {
tmpview[k] = v
}
}
n.ctrlr.processEndpointsUpdate(eps, &tmpview)
// Delete processing
for k := range tmpview {
n.Lock()
existing, ok := n.endpoints[k]
n.Unlock()
if !ok {
continue
}
tmp := endpoint{}
if err := cs.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound {
continue
}
if err := existing.deleteEndpoint(); err != nil {
log.Debugf("Delete failed %s: %s", existing.name, err)
}
}
}
}
}()
return nil
}
func (n *network) stopWatch() {
n.Lock()
if n.stopWatchCh != nil {
close(n.stopWatchCh)
n.stopWatchCh = nil
}
n.Unlock()
}
func (c *controller) processNetworkUpdate(nws []*store.KVPair, prune *networkTable) {
for _, kve := range nws {
var n network
err := json.Unmarshal(kve.Value, &n)
if err != nil {
log.Error(err)
continue
}
if prune != nil {
delete(*prune, n.id)
}
n.SetIndex(kve.LastIndex)
c.Lock()
existing, ok := c.networks[n.id]
if c.cfg == nil {
c.Unlock()
if ok {
existing.Lock()
// Skip existing network update
if existing.dbIndex != n.Index() {
// Can't use SetIndex() since existing is locked.
existing.dbIndex = n.Index()
existing.dbExists = true
existing.endpointCnt = n.endpointCnt
}
existing.Unlock()
continue
}
if err = c.newNetworkFromStore(&n); err != nil {
log.Error(err)
}
}
}
func (c *controller) processEndpointUpdate(ep *endpoint) bool {
nw := ep.network
if nw == nil {
return true
}
nw.Lock()
id := nw.id
nw.Unlock()
c.Lock()
n, ok := c.networks[id]
c.Unlock()
if !ok {
return true
}
existing, _ := n.EndpointByID(ep.id)
if existing == nil {
return true
}
ee := existing.(*endpoint)
ee.Lock()
if ee.dbIndex != ep.Index() {
// Can't use SetIndex() because ee is locked.
ee.dbIndex = ep.Index()
ee.dbExists = true
ee.sandboxID = ep.sandboxID
}
ee.Unlock()
return false
}
func ensureKeys(key string, cs datastore.DataStore) error {
exists, err := cs.KVStore().Exists(key)
if err != nil {
return err
}
if exists {
return nil
}
return cs.KVStore().Put(key, []byte{}, nil)
}
scopeConfigs := c.cfg.Scopes
c.Unlock()
func (c *controller) getLocalStoreConfig(cfg *config.Config) *config.DatastoreCfg {
if cfg != nil && cfg.LocalStore.Client.Provider != "" && cfg.LocalStore.Client.Address != "" {
return &cfg.LocalStore
for scope, scfg := range scopeConfigs {
store, err := datastore.NewDataStore(scope, scfg)
if err != nil {
return err
}
c.Lock()
c.stores = append(c.stores, store)
c.Unlock()
}
return &defaultLocalStoreConfig
c.startWatch()
return nil
}
func (c *controller) getDataStore(dataScope datastore.DataScope) (dataStore datastore.DataStore) {
func (c *controller) closeStores() {
for _, store := range c.getStores() {
store.Close()
}
}
func (c *controller) getStore(scope string) datastore.DataStore {
c.Lock()
if dataScope == datastore.GlobalScope {
dataStore = c.globalStore
} else if dataScope == datastore.LocalScope {
dataStore = c.localStore
defer c.Unlock()
for _, store := range c.stores {
if store.Scope() == scope {
return store
}
}
return nil
}
func (c *controller) getStores() []datastore.DataStore {
c.Lock()
defer c.Unlock()
return c.stores
}
func (c *controller) getNetworkFromStore(nid string) (*network, error) {
for _, store := range c.getStores() {
n := &network{id: nid, ctrlr: c}
err := store.GetObject(datastore.Key(n.Key()...), n)
if err != nil && err != datastore.ErrKeyNotFound {
return nil, fmt.Errorf("could not find network %s: %v", nid, err)
}
// Continue searching in the next store if the key is not found in this store
if err == datastore.ErrKeyNotFound {
continue
}
return n, nil
}
return nil, fmt.Errorf("network %s not found", nid)
}
func (c *controller) getNetworksFromStore() ([]*network, error) {
var nl []*network
for _, store := range c.getStores() {
kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
&network{ctrlr: c})
if err != nil && err != datastore.ErrKeyNotFound {
return nil, fmt.Errorf("failed to get networks for scope %s: %v",
store.Scope(), err)
}
// Continue searching in the next store if no keys found in this store
if err == datastore.ErrKeyNotFound {
continue
}
for _, kvo := range kvol {
n := kvo.(*network)
n.ctrlr = c
nl = append(nl, n)
}
}
return nl, nil
}
func (n *network) getEndpointFromStore(eid string) (*endpoint, error) {
for _, store := range n.ctrlr.getStores() {
ep := &endpoint{id: eid, network: n}
err := store.GetObject(datastore.Key(ep.Key()...), ep)
if err != nil && err != datastore.ErrKeyNotFound {
return nil, fmt.Errorf("could not find endpoint %s: %v", eid, err)
}
// Continue searching in the next store if the key is not found in this store
if err == datastore.ErrKeyNotFound {
continue
}
return ep, nil
}
return nil, fmt.Errorf("endpoint %s not found", eid)
}
func (n *network) getEndpointsFromStore() ([]*endpoint, error) {
var epl []*endpoint
tmp := endpoint{network: n}
for _, store := range n.getController().getStores() {
kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &endpoint{network: n})
if err != nil && err != datastore.ErrKeyNotFound {
return nil,
fmt.Errorf("failed to get endpoints for network %s scope %s: %v",
n.Name(), store.Scope(), err)
}
// Continue searching in the next store if no keys found in this store
if err == datastore.ErrKeyNotFound {
continue
}
for _, kvo := range kvol {
ep := kvo.(*endpoint)
ep.network = n
epl = append(epl, ep)
}
}
return epl, nil
}
func (c *controller) updateToStore(kvObject datastore.KVObject) error {
cs := c.getStore(kvObject.DataScope())
if cs == nil {
log.Warnf("datastore for scope %s not initialized. kv object %s is not added to the store", kvObject.DataScope(), datastore.Key(kvObject.Key()...))
return nil
}
if err := cs.PutObjectAtomic(kvObject); err != nil {
return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err)
}
return nil
}
func (c *controller) deleteFromStore(kvObject datastore.KVObject) error {
cs := c.getStore(kvObject.DataScope())
if cs == nil {
log.Debugf("datastore for scope %s not initialized. kv object %s is not deleted from datastore", kvObject.DataScope(), datastore.Key(kvObject.Key()...))
return nil
}
retry:
if err := cs.DeleteObjectAtomic(kvObject); err != nil {
if err == datastore.ErrKeyModified {
if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil {
return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err)
}
goto retry
}
return err
}
return nil
}
type netWatch struct {
localEps map[string]*endpoint
remoteEps map[string]*endpoint
stopCh chan struct{}
}
func (c *controller) getLocalEps(nw *netWatch) []*endpoint {
c.Lock()
defer c.Unlock()
var epl []*endpoint
for _, ep := range nw.localEps {
epl = append(epl, ep)
}
return epl
}
func (c *controller) watchSvcRecord(ep *endpoint) {
c.watchCh <- ep
}
func (c *controller) unWatchSvcRecord(ep *endpoint) {
c.unWatchCh <- ep
}
func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, nCh <-chan datastore.KVObject) {
for {
select {
case <-nw.stopCh:
return
case o := <-nCh:
n := o.(*network)
epl, err := n.getEndpointsFromStore()
if err != nil {
break
}
c.Lock()
var addEp []*endpoint
delEpMap := make(map[string]*endpoint)
for k, v := range nw.remoteEps {
delEpMap[k] = v
}
for _, lEp := range epl {
if _, ok := nw.localEps[lEp.ID()]; ok {
continue
}
if _, ok := nw.remoteEps[lEp.ID()]; ok {
delete(delEpMap, lEp.ID())
continue
}
nw.remoteEps[lEp.ID()] = lEp
addEp = append(addEp, lEp)
}
c.Unlock()
for _, lEp := range addEp {
ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), true)
}
for _, lEp := range delEpMap {
ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), false)
}
}
}
}
func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoint) {
c.Lock()
nw, ok := nmap[ep.getNetwork().ID()]
c.Unlock()
if ok {
// Update the svc db for the local endpoint join right away
ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), true)
c.Lock()
nw.localEps[ep.ID()] = ep
c.Unlock()
return
}
nw = &netWatch{
localEps: make(map[string]*endpoint),
remoteEps: make(map[string]*endpoint),
}
// Update the svc db for the local endpoint join right away
// Do this before adding this ep to localEps so that we don't
// try to update this ep's container's svc records
ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), true)
c.Lock()
nw.localEps[ep.ID()] = ep
nmap[ep.getNetwork().ID()] = nw
nw.stopCh = make(chan struct{})
c.Unlock()
store := c.getStore(ep.getNetwork().DataScope())
if store == nil {
return
}
if !store.Watchable() {
return
}
ch, err := store.Watch(ep.getNetwork(), nw.stopCh)
if err != nil {
log.Warnf("Error creating watch for network: %v", err)
return
}
go c.networkWatchLoop(nw, ep, ch)
}
func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoint) {
c.Lock()
nw, ok := nmap[ep.getNetwork().ID()]
if ok {
delete(nw.localEps, ep.ID())
c.Unlock()
// Update the svc db about local endpoint leave right away
// Do this after we remove this ep from localEps so that we
// don't try to remove this svc record from this ep's container.
ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), false)
c.Lock()
if len(nw.localEps) == 0 {
close(nw.stopCh)
delete(nmap, ep.getNetwork().ID())
}
}
c.Unlock()
return
}
func (c *controller) processEndpointsUpdate(eps []*store.KVPair, prune *endpointTable) {
for _, epe := range eps {
var ep endpoint
err := json.Unmarshal(epe.Value, &ep)
if err != nil {
log.Error(err)
continue
}
if prune != nil {
delete(*prune, ep.id)
}
ep.SetIndex(epe.LastIndex)
if nid, err := ep.networkIDFromKey(epe.Key); err != nil {
log.Error(err)
continue
} else {
if n, err := c.NetworkByID(nid); err != nil {
log.Error(err)
continue
} else {
ep.network = n.(*network)
}
}
if c.processEndpointUpdate(&ep) {
err = c.newEndpointFromStore(epe.Key, &ep)
if err != nil {
log.Error(err)
}
func (c *controller) watchLoop(nmap map[string]*netWatch) {
for {
select {
case ep := <-c.watchCh:
c.processEndpointCreate(nmap, ep)
case ep := <-c.unWatchCh:
c.processEndpointDelete(nmap, ep)
}
}
}
func (c *controller) startWatch() {
c.watchCh = make(chan *endpoint)
c.unWatchCh = make(chan *endpoint)
nmap := make(map[string]*netWatch)
go c.watchLoop(nmap)
}

View File

@ -33,7 +33,7 @@ func testNewController(t *testing.T, provider, url string) (NetworkController, e
}
func TestBoltdbBackend(t *testing.T) {
defer os.Remove(defaultLocalStoreConfig.Client.Address)
defer os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
testLocalBackend(t, "", "", nil)
defer os.Remove("/tmp/boltdb.db")
config := &store.Config{Bucket: "testBackend", ConnectionTimeout: 3 * time.Second}
@ -64,7 +64,7 @@ func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Con
if err != nil {
t.Fatalf("Error creating endpoint: %v", err)
}
store := ctrl.(*controller).localStore.KVStore()
store := ctrl.(*controller).getStore(datastore.LocalScope).KVStore()
if exists, err := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, string(nw.ID()))); !exists || err != nil {
t.Fatalf("Network key should have been created.")
}
@ -100,7 +100,7 @@ func TestNoPersist(t *testing.T) {
if err != nil {
t.Fatalf("Error creating endpoint: %v", err)
}
store := ctrl.(*controller).localStore.KVStore()
store := ctrl.(*controller).getStore(datastore.LocalScope).KVStore()
if exists, _ := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, string(nw.ID()))); exists {
t.Fatalf("Network with persist=false should not be stored in KV Store")
}
@ -138,12 +138,8 @@ func TestLocalStoreLockTimeout(t *testing.T) {
}
defer ctrl1.Stop()
// Use the same boltdb file without closing the previous controller
ctrl2, err := New(cfgOptions...)
if err != nil {
t.Fatalf("Error new controller: %v", err)
}
store := ctrl2.(*controller).localStore
if store != nil {
t.Fatalf("localstore is expected to be nil")
_, err = New(cfgOptions...)
if err == nil {
t.Fatalf("Expected to fail but succeeded")
}
}