1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/daemon/network.go
Flavio Crisciani e2ec006797
Fix race condition between swarm and libnetwork
This commit in conjunction with a libnetwork side commit,
cleans up the libnetwork SetClusterProvider logic interaction.
The previous code was inducing libnetwork to spawn several go
routines that were racing between each other during the agent
init and close.

A test got added to verify that back to back swarm init and leave
are properly processed and not raise crashes

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
2017-05-10 21:16:52 -07:00

542 lines
16 KiB
Go

package daemon
import (
"fmt"
"net"
"runtime"
"sort"
"strings"
"sync"
"github.com/Sirupsen/logrus"
apierrors "github.com/docker/docker/api/errors"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/network"
clustertypes "github.com/docker/docker/daemon/cluster/provider"
"github.com/docker/docker/pkg/plugingetter"
"github.com/docker/docker/runconfig"
"github.com/docker/libnetwork"
lncluster "github.com/docker/libnetwork/cluster"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/ipamapi"
networktypes "github.com/docker/libnetwork/types"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
// NetworkControllerEnabled checks if the networking stack is enabled.
// This feature depends on OS primitives and it's disabled in systems like Windows.
func (daemon *Daemon) NetworkControllerEnabled() bool {
return daemon.netController != nil
}
// FindNetwork function finds a network for a given string that can represent network name or id
func (daemon *Daemon) FindNetwork(idName string) (libnetwork.Network, error) {
// Find by Name
n, err := daemon.GetNetworkByName(idName)
if err != nil && !isNoSuchNetworkError(err) {
return nil, err
}
if n != nil {
return n, nil
}
// Find by id
return daemon.GetNetworkByID(idName)
}
func isNoSuchNetworkError(err error) bool {
_, ok := err.(libnetwork.ErrNoSuchNetwork)
return ok
}
// GetNetworkByID function returns a network whose ID begins with the given prefix.
// It fails with an error if no matching, or more than one matching, networks are found.
func (daemon *Daemon) GetNetworkByID(partialID string) (libnetwork.Network, error) {
list := daemon.GetNetworksByID(partialID)
if len(list) == 0 {
return nil, libnetwork.ErrNoSuchNetwork(partialID)
}
if len(list) > 1 {
return nil, libnetwork.ErrInvalidID(partialID)
}
return list[0], nil
}
// GetNetworkByName function returns a network for a given network name.
// If no network name is given, the default network is returned.
func (daemon *Daemon) GetNetworkByName(name string) (libnetwork.Network, error) {
c := daemon.netController
if c == nil {
return nil, libnetwork.ErrNoSuchNetwork(name)
}
if name == "" {
name = c.Config().Daemon.DefaultNetwork
}
return c.NetworkByName(name)
}
// GetNetworksByID returns a list of networks whose ID partially matches zero or more networks
func (daemon *Daemon) GetNetworksByID(partialID string) []libnetwork.Network {
c := daemon.netController
if c == nil {
return nil
}
list := []libnetwork.Network{}
l := func(nw libnetwork.Network) bool {
if strings.HasPrefix(nw.ID(), partialID) {
list = append(list, nw)
}
return false
}
c.WalkNetworks(l)
return list
}
// getAllNetworks returns a list containing all networks
func (daemon *Daemon) getAllNetworks() []libnetwork.Network {
return daemon.netController.Networks()
}
type ingressJob struct {
create *clustertypes.NetworkCreateRequest
ip net.IP
jobDone chan struct{}
}
var (
ingressWorkerOnce sync.Once
ingressJobsChannel chan *ingressJob
ingressID string
)
func (daemon *Daemon) startIngressWorker() {
ingressJobsChannel = make(chan *ingressJob, 100)
go func() {
for {
select {
case r := <-ingressJobsChannel:
if r.create != nil {
daemon.setupIngress(r.create, r.ip, ingressID)
ingressID = r.create.ID
} else {
daemon.releaseIngress(ingressID)
ingressID = ""
}
close(r.jobDone)
}
}
}()
}
// enqueueIngressJob adds a ingress add/rm request to the worker queue.
// It guarantees the worker is started.
func (daemon *Daemon) enqueueIngressJob(job *ingressJob) {
ingressWorkerOnce.Do(daemon.startIngressWorker)
ingressJobsChannel <- job
}
// SetupIngress setups ingress networking.
// The function returns a channel which will signal the caller when the programming is completed.
func (daemon *Daemon) SetupIngress(create clustertypes.NetworkCreateRequest, nodeIP string) (<-chan struct{}, error) {
ip, _, err := net.ParseCIDR(nodeIP)
if err != nil {
return nil, err
}
done := make(chan struct{})
daemon.enqueueIngressJob(&ingressJob{&create, ip, done})
return done, nil
}
// ReleaseIngress releases the ingress networking.
// The function returns a channel which will signal the caller when the programming is completed.
func (daemon *Daemon) ReleaseIngress() (<-chan struct{}, error) {
done := make(chan struct{})
daemon.enqueueIngressJob(&ingressJob{nil, nil, done})
return done, nil
}
func (daemon *Daemon) setupIngress(create *clustertypes.NetworkCreateRequest, ip net.IP, staleID string) {
controller := daemon.netController
controller.AgentInitWait()
if staleID != "" && staleID != create.ID {
daemon.releaseIngress(staleID)
}
if _, err := daemon.createNetwork(create.NetworkCreateRequest, create.ID, true); err != nil {
// If it is any other error other than already
// exists error log error and return.
if _, ok := err.(libnetwork.NetworkNameError); !ok {
logrus.Errorf("Failed creating ingress network: %v", err)
return
}
// Otherwise continue down the call to create or recreate sandbox.
}
n, err := daemon.GetNetworkByID(create.ID)
if err != nil {
logrus.Errorf("Failed getting ingress network by id after creating: %v", err)
}
sb, err := controller.NewSandbox("ingress-sbox", libnetwork.OptionIngress())
if err != nil {
if _, ok := err.(networktypes.ForbiddenError); !ok {
logrus.Errorf("Failed creating ingress sandbox: %v", err)
}
return
}
ep, err := n.CreateEndpoint("ingress-endpoint", libnetwork.CreateOptionIpam(ip, nil, nil, nil))
if err != nil {
logrus.Errorf("Failed creating ingress endpoint: %v", err)
return
}
if err := ep.Join(sb, nil); err != nil {
logrus.Errorf("Failed joining ingress sandbox to ingress endpoint: %v", err)
return
}
if err := sb.EnableService(); err != nil {
logrus.Errorf("Failed enabling service for ingress sandbox")
}
}
func (daemon *Daemon) releaseIngress(id string) {
controller := daemon.netController
if err := controller.SandboxDestroy("ingress-sbox"); err != nil {
logrus.Errorf("Failed to delete ingress sandbox: %v", err)
}
if id == "" {
return
}
n, err := controller.NetworkByID(id)
if err != nil {
logrus.Errorf("failed to retrieve ingress network %s: %v", id, err)
return
}
for _, ep := range n.Endpoints() {
if err := ep.Delete(true); err != nil {
logrus.Errorf("Failed to delete endpoint %s (%s): %v", ep.Name(), ep.ID(), err)
return
}
}
if err := n.Delete(); err != nil {
logrus.Errorf("Failed to delete ingress network %s: %v", n.ID(), err)
return
}
return
}
// SetNetworkBootstrapKeys sets the bootstrap keys.
func (daemon *Daemon) SetNetworkBootstrapKeys(keys []*networktypes.EncryptionKey) error {
err := daemon.netController.SetKeys(keys)
if err == nil {
// Upon successful key setting dispatch the keys available event
daemon.cluster.SendClusterEvent(lncluster.EventNetworkKeysAvailable)
}
return err
}
// UpdateAttachment notifies the attacher about the attachment config.
func (daemon *Daemon) UpdateAttachment(networkName, networkID, containerID string, config *network.NetworkingConfig) error {
if daemon.clusterProvider == nil {
return fmt.Errorf("cluster provider is not initialized")
}
if err := daemon.clusterProvider.UpdateAttachment(networkName, containerID, config); err != nil {
return daemon.clusterProvider.UpdateAttachment(networkID, containerID, config)
}
return nil
}
// WaitForDetachment makes the cluster manager wait for detachment of
// the container from the network.
func (daemon *Daemon) WaitForDetachment(ctx context.Context, networkName, networkID, taskID, containerID string) error {
if daemon.clusterProvider == nil {
return fmt.Errorf("cluster provider is not initialized")
}
return daemon.clusterProvider.WaitForDetachment(ctx, networkName, networkID, taskID, containerID)
}
// CreateManagedNetwork creates an agent network.
func (daemon *Daemon) CreateManagedNetwork(create clustertypes.NetworkCreateRequest) error {
_, err := daemon.createNetwork(create.NetworkCreateRequest, create.ID, true)
return err
}
// CreateNetwork creates a network with the given name, driver and other optional parameters
func (daemon *Daemon) CreateNetwork(create types.NetworkCreateRequest) (*types.NetworkCreateResponse, error) {
resp, err := daemon.createNetwork(create, "", false)
if err != nil {
return nil, err
}
return resp, err
}
func (daemon *Daemon) createNetwork(create types.NetworkCreateRequest, id string, agent bool) (*types.NetworkCreateResponse, error) {
if runconfig.IsPreDefinedNetwork(create.Name) && !agent {
err := fmt.Errorf("%s is a pre-defined network and cannot be created", create.Name)
return nil, apierrors.NewRequestForbiddenError(err)
}
var warning string
nw, err := daemon.GetNetworkByName(create.Name)
if err != nil {
if _, ok := err.(libnetwork.ErrNoSuchNetwork); !ok {
return nil, err
}
}
if nw != nil {
// check if user defined CheckDuplicate, if set true, return err
// otherwise prepare a warning message
if create.CheckDuplicate {
return nil, libnetwork.NetworkNameError(create.Name)
}
warning = fmt.Sprintf("Network with name %s (id : %s) already exists", nw.Name(), nw.ID())
}
c := daemon.netController
driver := create.Driver
if driver == "" {
driver = c.Config().Daemon.DefaultDriver
}
nwOptions := []libnetwork.NetworkOption{
libnetwork.NetworkOptionEnableIPv6(create.EnableIPv6),
libnetwork.NetworkOptionDriverOpts(create.Options),
libnetwork.NetworkOptionLabels(create.Labels),
libnetwork.NetworkOptionAttachable(create.Attachable),
libnetwork.NetworkOptionIngress(create.Ingress),
}
if create.IPAM != nil {
ipam := create.IPAM
v4Conf, v6Conf, err := getIpamConfig(ipam.Config)
if err != nil {
return nil, err
}
nwOptions = append(nwOptions, libnetwork.NetworkOptionIpam(ipam.Driver, "", v4Conf, v6Conf, ipam.Options))
}
if create.Internal {
nwOptions = append(nwOptions, libnetwork.NetworkOptionInternalNetwork())
}
if agent {
nwOptions = append(nwOptions, libnetwork.NetworkOptionDynamic())
nwOptions = append(nwOptions, libnetwork.NetworkOptionPersist(false))
}
n, err := c.NewNetwork(driver, create.Name, id, nwOptions...)
if err != nil {
if _, ok := err.(libnetwork.ErrDataStoreNotInitialized); ok {
return nil, errors.New("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
}
return nil, err
}
daemon.pluginRefCount(driver, driverapi.NetworkPluginEndpointType, plugingetter.Acquire)
if create.IPAM != nil {
daemon.pluginRefCount(create.IPAM.Driver, ipamapi.PluginEndpointType, plugingetter.Acquire)
}
daemon.LogNetworkEvent(n, "create")
return &types.NetworkCreateResponse{
ID: n.ID(),
Warning: warning,
}, nil
}
func (daemon *Daemon) pluginRefCount(driver, capability string, mode int) {
var builtinDrivers []string
if capability == driverapi.NetworkPluginEndpointType {
builtinDrivers = daemon.netController.BuiltinDrivers()
} else if capability == ipamapi.PluginEndpointType {
builtinDrivers = daemon.netController.BuiltinIPAMDrivers()
}
for _, d := range builtinDrivers {
if d == driver {
return
}
}
if daemon.PluginStore != nil {
_, err := daemon.PluginStore.Get(driver, capability, mode)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{"mode": mode, "driver": driver}).Error("Error handling plugin refcount operation")
}
}
}
func getIpamConfig(data []network.IPAMConfig) ([]*libnetwork.IpamConf, []*libnetwork.IpamConf, error) {
ipamV4Cfg := []*libnetwork.IpamConf{}
ipamV6Cfg := []*libnetwork.IpamConf{}
for _, d := range data {
iCfg := libnetwork.IpamConf{}
iCfg.PreferredPool = d.Subnet
iCfg.SubPool = d.IPRange
iCfg.Gateway = d.Gateway
iCfg.AuxAddresses = d.AuxAddress
ip, _, err := net.ParseCIDR(d.Subnet)
if err != nil {
return nil, nil, fmt.Errorf("Invalid subnet %s : %v", d.Subnet, err)
}
if ip.To4() != nil {
ipamV4Cfg = append(ipamV4Cfg, &iCfg)
} else {
ipamV6Cfg = append(ipamV6Cfg, &iCfg)
}
}
return ipamV4Cfg, ipamV6Cfg, nil
}
// UpdateContainerServiceConfig updates a service configuration.
func (daemon *Daemon) UpdateContainerServiceConfig(containerName string, serviceConfig *clustertypes.ServiceConfig) error {
container, err := daemon.GetContainer(containerName)
if err != nil {
return err
}
container.NetworkSettings.Service = serviceConfig
return nil
}
// ConnectContainerToNetwork connects the given container to the given
// network. If either cannot be found, an err is returned. If the
// network cannot be set up, an err is returned.
func (daemon *Daemon) ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error {
if runtime.GOOS == "solaris" {
return errors.New("docker network connect is unsupported on Solaris platform")
}
container, err := daemon.GetContainer(containerName)
if err != nil {
return err
}
return daemon.ConnectToNetwork(container, networkName, endpointConfig)
}
// DisconnectContainerFromNetwork disconnects the given container from
// the given network. If either cannot be found, an err is returned.
func (daemon *Daemon) DisconnectContainerFromNetwork(containerName string, networkName string, force bool) error {
if runtime.GOOS == "solaris" {
return errors.New("docker network disconnect is unsupported on Solaris platform")
}
container, err := daemon.GetContainer(containerName)
if err != nil {
if force {
return daemon.ForceEndpointDelete(containerName, networkName)
}
return err
}
return daemon.DisconnectFromNetwork(container, networkName, force)
}
// GetNetworkDriverList returns the list of plugins drivers
// registered for network.
func (daemon *Daemon) GetNetworkDriverList() []string {
if !daemon.NetworkControllerEnabled() {
return nil
}
pluginList := daemon.netController.BuiltinDrivers()
managedPlugins := daemon.PluginStore.GetAllManagedPluginsByCap(driverapi.NetworkPluginEndpointType)
for _, plugin := range managedPlugins {
pluginList = append(pluginList, plugin.Name())
}
pluginMap := make(map[string]bool)
for _, plugin := range pluginList {
pluginMap[plugin] = true
}
networks := daemon.netController.Networks()
for _, network := range networks {
if !pluginMap[network.Type()] {
pluginList = append(pluginList, network.Type())
pluginMap[network.Type()] = true
}
}
sort.Strings(pluginList)
return pluginList
}
// DeleteManagedNetwork deletes an agent network.
func (daemon *Daemon) DeleteManagedNetwork(networkID string) error {
return daemon.deleteNetwork(networkID, true)
}
// DeleteNetwork destroys a network unless it's one of docker's predefined networks.
func (daemon *Daemon) DeleteNetwork(networkID string) error {
return daemon.deleteNetwork(networkID, false)
}
func (daemon *Daemon) deleteNetwork(networkID string, dynamic bool) error {
nw, err := daemon.FindNetwork(networkID)
if err != nil {
return err
}
if runconfig.IsPreDefinedNetwork(nw.Name()) && !dynamic {
err := fmt.Errorf("%s is a pre-defined network and cannot be removed", nw.Name())
return apierrors.NewRequestForbiddenError(err)
}
if err := nw.Delete(); err != nil {
return err
}
daemon.pluginRefCount(nw.Type(), driverapi.NetworkPluginEndpointType, plugingetter.Release)
ipamType, _, _, _ := nw.Info().IpamConfig()
daemon.pluginRefCount(ipamType, ipamapi.PluginEndpointType, plugingetter.Release)
daemon.LogNetworkEvent(nw, "destroy")
return nil
}
// GetNetworks returns a list of all networks
func (daemon *Daemon) GetNetworks() []libnetwork.Network {
return daemon.getAllNetworks()
}
// clearAttachableNetworks removes the attachable networks
// after disconnecting any connected container
func (daemon *Daemon) clearAttachableNetworks() {
for _, n := range daemon.GetNetworks() {
if !n.Info().Attachable() {
continue
}
for _, ep := range n.Endpoints() {
epInfo := ep.Info()
if epInfo == nil {
continue
}
sb := epInfo.Sandbox()
if sb == nil {
continue
}
containerID := sb.ContainerID()
if err := daemon.DisconnectContainerFromNetwork(containerID, n.ID(), true); err != nil {
logrus.Warnf("Failed to disconnect container %s from swarm network %s on cluster leave: %v",
containerID, n.Name(), err)
}
}
if err := daemon.DeleteManagedNetwork(n.ID()); err != nil {
logrus.Warnf("Failed to remove swarm network %s on cluster leave: %v", n.Name(), err)
}
}
}