1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Libnetwork vendoring

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
This commit is contained in:
Flavio Crisciani 2017-05-09 18:27:34 -07:00
parent e2ec006797
commit 385176980e
No known key found for this signature in database
GPG key ID: 28CAFCE754CF3A48
5 changed files with 117 additions and 87 deletions

View file

@ -25,7 +25,7 @@ github.com/imdario/mergo 0.2.1
golang.org/x/sync de49d9dcd27d4f764488181bea099dfe6179bcf0 golang.org/x/sync de49d9dcd27d4f764488181bea099dfe6179bcf0
#get libnetwork packages #get libnetwork packages
github.com/docker/libnetwork b015d4b1bcf4e666d8950651c8cc825a02842e7a github.com/docker/libnetwork 6786135bf7de08ec26a72a6f7e4291d27d113a3f
github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894 github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec

View file

@ -13,6 +13,7 @@ import (
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/stringid" "github.com/docker/docker/pkg/stringid"
"github.com/docker/go-events" "github.com/docker/go-events"
"github.com/docker/libnetwork/cluster"
"github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/discoverapi" "github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/driverapi"
@ -40,7 +41,7 @@ type agent struct {
bindAddr string bindAddr string
advertiseAddr string advertiseAddr string
dataPathAddr string dataPathAddr string
epTblCancel func() coreCancelFuncs []func()
driverCancelFuncs map[string][]func() driverCancelFuncs map[string][]func()
sync.Mutex sync.Mutex
} }
@ -192,16 +193,12 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
return nil return nil
} }
func (c *controller) agentSetup() error { func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
c.Lock() agent := c.getAgent()
clusterProvider := c.cfg.Daemon.ClusterProvider
agent := c.agent
c.Unlock()
if clusterProvider == nil { // If the agent is already present there is no need to try to initilize it again
msg := "Aborting initialization of Libnetwork Agent because cluster provider is now unset" if agent != nil {
logrus.Errorf(msg) return nil
return fmt.Errorf(msg)
} }
bindAddr := clusterProvider.GetLocalAddress() bindAddr := clusterProvider.GetLocalAddress()
@ -221,8 +218,9 @@ func (c *controller) agentSetup() error {
listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList) listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList)
if advAddr != "" && agent == nil { if advAddr != "" && agent == nil {
if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil { if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
logrus.Errorf("Error in agentInit : %v", err) logrus.Errorf("error in agentInit: %v", err)
} else { return err
}
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool { c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
if capability.DataScope == datastore.GlobalScope { if capability.DataScope == datastore.GlobalScope {
c.agentDriverNotify(driver) c.agentDriverNotify(driver)
@ -230,7 +228,6 @@ func (c *controller) agentSetup() error {
return false return false
}) })
} }
}
if len(remoteAddrList) > 0 { if len(remoteAddrList) > 0 {
if err := c.agentJoin(remoteAddrList); err != nil { if err := c.agentJoin(remoteAddrList); err != nil {
@ -238,14 +235,6 @@ func (c *controller) agentSetup() error {
} }
} }
c.Lock()
if c.agent != nil && c.agentInitDone != nil {
close(c.agentInitDone)
c.agentInitDone = nil
c.agentStopDone = make(chan struct{})
}
c.Unlock()
return nil return nil
} }
@ -287,16 +276,12 @@ func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
} }
func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error { func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
if !c.isAgent() {
return nil
}
bindAddr, err := resolveAddr(bindAddrOrInterface) bindAddr, err := resolveAddr(bindAddrOrInterface)
if err != nil { if err != nil {
return err return err
} }
keys, tags := c.getKeys(subsysGossip) keys, _ := c.getKeys(subsysGossip)
hostname, _ := os.Hostname() hostname, _ := os.Hostname()
nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID()) nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
logrus.Info("Gossip cluster hostname ", nodeName) logrus.Info("Gossip cluster hostname ", nodeName)
@ -312,8 +297,11 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
return err return err
} }
var cancelList []func()
ch, cancel := nDB.Watch(libnetworkEPTable, "", "") ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
cancelList = append(cancelList, cancel)
nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "") nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
cancelList = append(cancelList, cancel)
c.Lock() c.Lock()
c.agent = &agent{ c.agent = &agent{
@ -321,7 +309,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
bindAddr: bindAddr, bindAddr: bindAddr,
advertiseAddr: advertiseAddr, advertiseAddr: advertiseAddr,
dataPathAddr: dataPathAddr, dataPathAddr: dataPathAddr,
epTblCancel: cancel, coreCancelFuncs: cancelList,
driverCancelFuncs: make(map[string][]func()), driverCancelFuncs: make(map[string][]func()),
} }
c.Unlock() c.Unlock()
@ -330,7 +318,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
go c.handleTableEvents(nodeCh, c.handleNodeTableEvent) go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
drvEnc := discoverapi.DriverEncryptionConfig{} drvEnc := discoverapi.DriverEncryptionConfig{}
keys, tags = c.getKeys(subsysIPSec) keys, tags := c.getKeys(subsysIPSec)
drvEnc.Keys = keys drvEnc.Keys = keys
drvEnc.Tags = tags drvEnc.Tags = tags
@ -399,14 +387,17 @@ func (c *controller) agentClose() {
cancelList = append(cancelList, cancel) cancelList = append(cancelList, cancel)
} }
} }
// Add also the cancel functions for the network db
for _, cancel := range agent.coreCancelFuncs {
cancelList = append(cancelList, cancel)
}
agent.Unlock() agent.Unlock()
for _, cancel := range cancelList { for _, cancel := range cancelList {
cancel() cancel()
} }
agent.epTblCancel()
agent.networkDB.Close() agent.networkDB.Close()
} }

View file

@ -5,6 +5,20 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
) )
const (
// EventSocketChange control socket changed
EventSocketChange = iota
// EventNodeReady cluster node in ready state
EventNodeReady
// EventNodeLeave node is leaving the cluster
EventNodeLeave
// EventNetworkKeysAvailable network keys correctly configured in the networking layer
EventNetworkKeysAvailable
)
// ConfigEventType type of the event produced by the cluster
type ConfigEventType uint8
// Provider provides clustering config details // Provider provides clustering config details
type Provider interface { type Provider interface {
IsManager() bool IsManager() bool
@ -14,7 +28,7 @@ type Provider interface {
GetAdvertiseAddress() string GetAdvertiseAddress() string
GetDataPathAddress() string GetDataPathAddress() string
GetRemoteAddressList() []string GetRemoteAddressList() []string
ListenClusterEvents() <-chan struct{} ListenClusterEvents() <-chan ConfigEventType
AttachNetwork(string, string, []string) (*network.NetworkingConfig, error) AttachNetwork(string, string, []string) (*network.NetworkingConfig, error)
DetachNetwork(string, string) error DetachNetwork(string, string) error
UpdateAttachment(string, string, *network.NetworkingConfig) error UpdateAttachment(string, string, *network.NetworkingConfig) error

View file

@ -34,7 +34,6 @@ type DaemonCfg struct {
Labels []string Labels []string
DriverCfg map[string]interface{} DriverCfg map[string]interface{}
ClusterProvider cluster.Provider ClusterProvider cluster.Provider
DisableProvider chan struct{}
} }
// ClusterCfg represents cluster configuration // ClusterCfg represents cluster configuration
@ -75,7 +74,6 @@ func ParseConfigOptions(cfgOptions ...Option) *Config {
cfg := &Config{ cfg := &Config{
Daemon: DaemonCfg{ Daemon: DaemonCfg{
DriverCfg: make(map[string]interface{}), DriverCfg: make(map[string]interface{}),
DisableProvider: make(chan struct{}, 10),
}, },
Scopes: make(map[string]*datastore.ScopeCfg), Scopes: make(map[string]*datastore.ScopeCfg),
} }

View file

@ -244,15 +244,24 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
} }
func (c *controller) SetClusterProvider(provider cluster.Provider) { func (c *controller) SetClusterProvider(provider cluster.Provider) {
var sameProvider bool
c.Lock() c.Lock()
c.cfg.Daemon.ClusterProvider = provider // Avoids to spawn multiple goroutine for the same cluster provider
disableProviderCh := c.cfg.Daemon.DisableProvider if c.cfg.Daemon.ClusterProvider == provider {
c.Unlock() // If the cluster provider is already set, there is already a go routine spawned
if provider != nil { // that is listening for events, so nothing to do here
go c.clusterAgentInit() sameProvider = true
} else { } else {
disableProviderCh <- struct{}{} c.cfg.Daemon.ClusterProvider = provider
} }
c.Unlock()
if provider == nil || sameProvider {
return
}
// We don't want to spawn a new go routine if the previous one did not exit yet
c.AgentStopWait()
go c.clusterAgentInit()
} }
func isValidClusteringIP(addr string) bool { func isValidClusteringIP(addr string) bool {
@ -262,12 +271,6 @@ func isValidClusteringIP(addr string) bool {
// libnetwork side of agent depends on the keys. On the first receipt of // libnetwork side of agent depends on the keys. On the first receipt of
// keys setup the agent. For subsequent key set handle the key change // keys setup the agent. For subsequent key set handle the key change
func (c *controller) SetKeys(keys []*types.EncryptionKey) error { func (c *controller) SetKeys(keys []*types.EncryptionKey) error {
c.Lock()
existingKeys := c.keys
clusterConfigAvailable := c.clusterConfigAvailable
agent := c.agent
c.Unlock()
subsysKeys := make(map[string]int) subsysKeys := make(map[string]int)
for _, key := range keys { for _, key := range keys {
if key.Subsystem != subsysGossip && if key.Subsystem != subsysGossip &&
@ -282,19 +285,8 @@ func (c *controller) SetKeys(keys []*types.EncryptionKey) error {
} }
} }
if len(existingKeys) == 0 { agent := c.getAgent()
c.Lock()
c.keys = keys
c.Unlock()
if agent != nil {
return (fmt.Errorf("libnetwork agent setup without keys"))
}
if clusterConfigAvailable {
return c.agentSetup()
}
logrus.Debug("received encryption keys before cluster config")
return nil
}
if agent == nil { if agent == nil {
c.Lock() c.Lock()
c.keys = keys c.keys = keys
@ -312,24 +304,32 @@ func (c *controller) getAgent() *agent {
func (c *controller) clusterAgentInit() { func (c *controller) clusterAgentInit() {
clusterProvider := c.cfg.Daemon.ClusterProvider clusterProvider := c.cfg.Daemon.ClusterProvider
var keysAvailable bool
for { for {
select { eventType := <-clusterProvider.ListenClusterEvents()
case <-clusterProvider.ListenClusterEvents(): // The events: EventSocketChange, EventNodeReady and EventNetworkKeysAvailable are not ordered
if !c.isDistributedControl() { // when all the condition for the agent initialization are met then proceed with it
switch eventType {
case cluster.EventNetworkKeysAvailable:
// Validates that the keys are actually available before starting the initialization
// This will handle old spurious messages left on the channel
c.Lock() c.Lock()
c.clusterConfigAvailable = true keysAvailable = c.keys != nil
keys := c.keys
c.Unlock() c.Unlock()
// agent initialization needs encryption keys and bind/remote IP which fallthrough
// comes from the daemon cluster events case cluster.EventSocketChange, cluster.EventNodeReady:
if len(keys) > 0 { if keysAvailable && !c.isDistributedControl() {
c.agentSetup() c.agentOperationStart()
if err := c.agentSetup(clusterProvider); err != nil {
c.agentStopComplete()
} else {
c.agentInitComplete()
} }
} }
case <-c.cfg.Daemon.DisableProvider: case cluster.EventNodeLeave:
keysAvailable = false
c.agentOperationStart()
c.Lock() c.Lock()
c.clusterConfigAvailable = false
c.agentInitDone = make(chan struct{})
c.keys = nil c.keys = nil
c.Unlock() c.Unlock()
@ -343,20 +343,14 @@ func (c *controller) clusterAgentInit() {
c.agentClose() c.agentClose()
c.cleanupServiceBindings("") c.cleanupServiceBindings("")
c.Lock() c.agentStopComplete()
if c.agentStopDone != nil {
close(c.agentStopDone)
c.agentStopDone = nil
}
c.Unlock()
return return
} }
} }
} }
// AgentInitWait waits for agent initialization to be completed in the // AgentInitWait waits for agent initialization to be completed in the controller.
// controller.
func (c *controller) AgentInitWait() { func (c *controller) AgentInitWait() {
c.Lock() c.Lock()
agentInitDone := c.agentInitDone agentInitDone := c.agentInitDone
@ -367,6 +361,7 @@ func (c *controller) AgentInitWait() {
} }
} }
// AgentStopWait waits for the Agent stop to be completed in the controller
func (c *controller) AgentStopWait() { func (c *controller) AgentStopWait() {
c.Lock() c.Lock()
agentStopDone := c.agentStopDone agentStopDone := c.agentStopDone
@ -376,6 +371,38 @@ func (c *controller) AgentStopWait() {
} }
} }
// agentOperationStart marks the start of an Agent Init or Agent Stop
func (c *controller) agentOperationStart() {
c.Lock()
if c.agentInitDone == nil {
c.agentInitDone = make(chan struct{})
}
if c.agentStopDone == nil {
c.agentStopDone = make(chan struct{})
}
c.Unlock()
}
// agentInitComplete notifies the successful completion of the Agent initialization
func (c *controller) agentInitComplete() {
c.Lock()
if c.agentInitDone != nil {
close(c.agentInitDone)
c.agentInitDone = nil
}
c.Unlock()
}
// agentStopComplete notifies the successful completion of the Agent stop
func (c *controller) agentStopComplete() {
c.Lock()
if c.agentStopDone != nil {
close(c.agentStopDone)
c.agentStopDone = nil
}
c.Unlock()
}
func (c *controller) makeDriverConfig(ntype string) map[string]interface{} { func (c *controller) makeDriverConfig(ntype string) map[string]interface{} {
if c.cfg == nil { if c.cfg == nil {
return nil return nil