Fix for swarm/libnetwork init race condition

This change cleans up the SetClusterProvider method.
Swarm calls the SetClusterProvider to pass to libnetwork the pointer
of the provider from which libnetwork can fetch all the information to
initialize the internal agent.

The method can be and is called multiple times passing the same value,
with the previous logic that was erroneusly spawning multiple go routines that
were making possiblea race between an agentInit and an agentClose.

The new logic aims to disallow it by checking for the provider passed and
ensuring that if the provider is already present there is nothing to do because
there is already an active go routine that is ready to process cluster events.
Moreover a patch on moby side takes care of clearing up the Cluster Events
dispacthing using only 1 channel to handle all the events types.
This will also guarantee in order event handling because now all the events are
piped into one single channel.

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
This commit is contained in:
Flavio Crisciani 2017-05-03 11:18:33 -07:00
parent 8c113c7fe4
commit a2bf0b35d6
5 changed files with 121 additions and 90 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/go-events"
"github.com/docker/libnetwork/cluster"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/driverapi"
@ -40,7 +41,7 @@ type agent struct {
bindAddr string
advertiseAddr string
dataPathAddr string
epTblCancel func()
coreCancelFuncs []func()
driverCancelFuncs map[string][]func()
sync.Mutex
}
@ -192,16 +193,12 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
return nil
}
func (c *controller) agentSetup() error {
c.Lock()
clusterProvider := c.cfg.Daemon.ClusterProvider
agent := c.agent
c.Unlock()
func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
agent := c.getAgent()
if clusterProvider == nil {
msg := "Aborting initialization of Libnetwork Agent because cluster provider is now unset"
logrus.Errorf(msg)
return fmt.Errorf(msg)
// If the agent is already present there is no need to try to initilize it again
if agent != nil {
return nil
}
bindAddr := clusterProvider.GetLocalAddress()
@ -221,15 +218,15 @@ func (c *controller) agentSetup() error {
listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList)
if advAddr != "" && agent == nil {
if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
logrus.Errorf("Error in agentInit : %v", err)
} else {
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
if capability.DataScope == datastore.GlobalScope {
c.agentDriverNotify(driver)
}
return false
})
logrus.Errorf("error in agentInit: %v", err)
return err
}
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
if capability.DataScope == datastore.GlobalScope {
c.agentDriverNotify(driver)
}
return false
})
}
if len(remoteAddrList) > 0 {
@ -238,14 +235,6 @@ func (c *controller) agentSetup() error {
}
}
c.Lock()
if c.agent != nil && c.agentInitDone != nil {
close(c.agentInitDone)
c.agentInitDone = nil
c.agentStopDone = make(chan struct{})
}
c.Unlock()
return nil
}
@ -287,16 +276,12 @@ func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
}
func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
if !c.isAgent() {
return nil
}
bindAddr, err := resolveAddr(bindAddrOrInterface)
if err != nil {
return err
}
keys, tags := c.getKeys(subsysGossip)
keys, _ := c.getKeys(subsysGossip)
hostname, _ := os.Hostname()
nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
logrus.Info("Gossip cluster hostname ", nodeName)
@ -312,8 +297,11 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
return err
}
var cancelList []func()
ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
cancelList = append(cancelList, cancel)
nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
cancelList = append(cancelList, cancel)
c.Lock()
c.agent = &agent{
@ -321,7 +309,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
bindAddr: bindAddr,
advertiseAddr: advertiseAddr,
dataPathAddr: dataPathAddr,
epTblCancel: cancel,
coreCancelFuncs: cancelList,
driverCancelFuncs: make(map[string][]func()),
}
c.Unlock()
@ -330,7 +318,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
drvEnc := discoverapi.DriverEncryptionConfig{}
keys, tags = c.getKeys(subsysIPSec)
keys, tags := c.getKeys(subsysIPSec)
drvEnc.Keys = keys
drvEnc.Tags = tags
@ -399,14 +387,17 @@ func (c *controller) agentClose() {
cancelList = append(cancelList, cancel)
}
}
// Add also the cancel functions for the network db
for _, cancel := range agent.coreCancelFuncs {
cancelList = append(cancelList, cancel)
}
agent.Unlock()
for _, cancel := range cancelList {
cancel()
}
agent.epTblCancel()
agent.networkDB.Close()
}

View File

@ -5,6 +5,20 @@ import (
"golang.org/x/net/context"
)
const (
// EventSocketChange control socket changed
EventSocketChange = iota
// EventNodeReady cluster node in ready state
EventNodeReady
// EventNodeLeave node is leaving the cluster
EventNodeLeave
// EventNetworkKeysAvailable network keys correctly configured in the networking layer
EventNetworkKeysAvailable
)
// ConfigEventType type of the event produced by the cluster
type ConfigEventType uint8
// Provider provides clustering config details
type Provider interface {
IsManager() bool
@ -14,7 +28,7 @@ type Provider interface {
GetAdvertiseAddress() string
GetDataPathAddress() string
GetRemoteAddressList() []string
ListenClusterEvents() <-chan struct{}
ListenClusterEvents() <-chan ConfigEventType
AttachNetwork(string, string, []string) (*network.NetworkingConfig, error)
DetachNetwork(string, string) error
UpdateAttachment(string, string, *network.NetworkingConfig) error

View File

@ -27,6 +27,7 @@ import (
"github.com/docker/docker/pkg/term"
"github.com/docker/libnetwork"
"github.com/docker/libnetwork/api"
"github.com/docker/libnetwork/cluster"
"github.com/docker/libnetwork/config"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
@ -234,7 +235,7 @@ type dnetConnection struct {
// addr holds the client address.
addr string
Orchestration *NetworkOrchestration
configEvent chan struct{}
configEvent chan cluster.ConfigEventType
}
// NetworkOrchestration exported
@ -275,7 +276,7 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error {
controller.SetClusterProvider(d)
if d.Orchestration.Agent || d.Orchestration.Manager {
d.configEvent <- struct{}{}
d.configEvent <- cluster.EventNodeReady
}
createDefaultNetwork(controller)
@ -335,7 +336,7 @@ func (d *dnetConnection) GetNetworkKeys() []*types.EncryptionKey {
func (d *dnetConnection) SetNetworkKeys([]*types.EncryptionKey) {
}
func (d *dnetConnection) ListenClusterEvents() <-chan struct{} {
func (d *dnetConnection) ListenClusterEvents() <-chan cluster.ConfigEventType {
return d.configEvent
}
@ -438,7 +439,7 @@ func newDnetConnection(val string) (*dnetConnection, error) {
return nil, errors.New("dnet currently only supports tcp transport")
}
return &dnetConnection{protoAddrParts[0], protoAddrParts[1], &NetworkOrchestration{}, make(chan struct{}, 10)}, nil
return &dnetConnection{protoAddrParts[0], protoAddrParts[1], &NetworkOrchestration{}, make(chan cluster.ConfigEventType, 10)}, nil
}
func (d *dnetConnection) httpCall(method, path string, data interface{}, headers map[string][]string) (io.ReadCloser, http.Header, int, error) {

View File

@ -34,7 +34,6 @@ type DaemonCfg struct {
Labels []string
DriverCfg map[string]interface{}
ClusterProvider cluster.Provider
DisableProvider chan struct{}
}
// ClusterCfg represents cluster configuration
@ -74,8 +73,7 @@ func ParseConfig(tomlCfgFile string) (*Config, error) {
func ParseConfigOptions(cfgOptions ...Option) *Config {
cfg := &Config{
Daemon: DaemonCfg{
DriverCfg: make(map[string]interface{}),
DisableProvider: make(chan struct{}, 10),
DriverCfg: make(map[string]interface{}),
},
Scopes: make(map[string]*datastore.ScopeCfg),
}

View File

@ -244,15 +244,24 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
}
func (c *controller) SetClusterProvider(provider cluster.Provider) {
var sameProvider bool
c.Lock()
c.cfg.Daemon.ClusterProvider = provider
disableProviderCh := c.cfg.Daemon.DisableProvider
c.Unlock()
if provider != nil {
go c.clusterAgentInit()
// Avoids to spawn multiple goroutine for the same cluster provider
if c.cfg.Daemon.ClusterProvider == provider {
// If the cluster provider is already set, there is already a go routine spawned
// that is listening for events, so nothing to do here
sameProvider = true
} else {
disableProviderCh <- struct{}{}
c.cfg.Daemon.ClusterProvider = provider
}
c.Unlock()
if provider == nil || sameProvider {
return
}
// We don't want to spawn a new go routine if the previous one did not exit yet
c.AgentStopWait()
go c.clusterAgentInit()
}
func isValidClusteringIP(addr string) bool {
@ -262,12 +271,6 @@ func isValidClusteringIP(addr string) bool {
// libnetwork side of agent depends on the keys. On the first receipt of
// keys setup the agent. For subsequent key set handle the key change
func (c *controller) SetKeys(keys []*types.EncryptionKey) error {
c.Lock()
existingKeys := c.keys
clusterConfigAvailable := c.clusterConfigAvailable
agent := c.agent
c.Unlock()
subsysKeys := make(map[string]int)
for _, key := range keys {
if key.Subsystem != subsysGossip &&
@ -282,19 +285,8 @@ func (c *controller) SetKeys(keys []*types.EncryptionKey) error {
}
}
if len(existingKeys) == 0 {
c.Lock()
c.keys = keys
c.Unlock()
if agent != nil {
return (fmt.Errorf("libnetwork agent setup without keys"))
}
if clusterConfigAvailable {
return c.agentSetup()
}
logrus.Debug("received encryption keys before cluster config")
return nil
}
agent := c.getAgent()
if agent == nil {
c.Lock()
c.keys = keys
@ -312,24 +304,32 @@ func (c *controller) getAgent() *agent {
func (c *controller) clusterAgentInit() {
clusterProvider := c.cfg.Daemon.ClusterProvider
var keysAvailable bool
for {
select {
case <-clusterProvider.ListenClusterEvents():
if !c.isDistributedControl() {
c.Lock()
c.clusterConfigAvailable = true
keys := c.keys
c.Unlock()
// agent initialization needs encryption keys and bind/remote IP which
// comes from the daemon cluster events
if len(keys) > 0 {
c.agentSetup()
eventType := <-clusterProvider.ListenClusterEvents()
// The events: EventSocketChange, EventNodeReady and EventNetworkKeysAvailable are not ordered
// when all the condition for the agent initialization are met then proceed with it
switch eventType {
case cluster.EventNetworkKeysAvailable:
// Validates that the keys are actually available before starting the initialization
// This will handle old spurious messages left on the channel
c.Lock()
keysAvailable = c.keys != nil
c.Unlock()
fallthrough
case cluster.EventSocketChange, cluster.EventNodeReady:
if keysAvailable && !c.isDistributedControl() {
c.agentOperationStart()
if err := c.agentSetup(clusterProvider); err != nil {
c.agentStopComplete()
} else {
c.agentInitComplete()
}
}
case <-c.cfg.Daemon.DisableProvider:
case cluster.EventNodeLeave:
keysAvailable = false
c.agentOperationStart()
c.Lock()
c.clusterConfigAvailable = false
c.agentInitDone = make(chan struct{})
c.keys = nil
c.Unlock()
@ -343,20 +343,14 @@ func (c *controller) clusterAgentInit() {
c.agentClose()
c.cleanupServiceBindings("")
c.Lock()
if c.agentStopDone != nil {
close(c.agentStopDone)
c.agentStopDone = nil
}
c.Unlock()
c.agentStopComplete()
return
}
}
}
// AgentInitWait waits for agent initialization to be completed in the
// controller.
// AgentInitWait waits for agent initialization to be completed in the controller.
func (c *controller) AgentInitWait() {
c.Lock()
agentInitDone := c.agentInitDone
@ -367,6 +361,7 @@ func (c *controller) AgentInitWait() {
}
}
// AgentStopWait waits for the Agent stop to be completed in the controller
func (c *controller) AgentStopWait() {
c.Lock()
agentStopDone := c.agentStopDone
@ -376,6 +371,38 @@ func (c *controller) AgentStopWait() {
}
}
// agentOperationStart marks the start of an Agent Init or Agent Stop
func (c *controller) agentOperationStart() {
c.Lock()
if c.agentInitDone == nil {
c.agentInitDone = make(chan struct{})
}
if c.agentStopDone == nil {
c.agentStopDone = make(chan struct{})
}
c.Unlock()
}
// agentInitComplete notifies the successful completion of the Agent initialization
func (c *controller) agentInitComplete() {
c.Lock()
if c.agentInitDone != nil {
close(c.agentInitDone)
c.agentInitDone = nil
}
c.Unlock()
}
// agentStopComplete notifies the successful completion of the Agent stop
func (c *controller) agentStopComplete() {
c.Lock()
if c.agentStopDone != nil {
close(c.agentStopDone)
c.agentStopDone = nil
}
c.Unlock()
}
func (c *controller) makeDriverConfig(ntype string) map[string]interface{} {
if c.cfg == nil {
return nil