mirror of
synced 2022-11-09 12:21:53 -05:00

This commit in conjunction with a libnetwork side commit, cleans up the libnetwork SetClusterProvider logic interaction. The previous code was inducing libnetwork to spawn several go routines that were racing between each other during the agent init and close. A test got added to verify that back to back swarm init and leave are properly processed and not raise crashes Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
542 lines
16 KiB
542 lines
16 KiB
package daemon
import (
apierrors "github.com/docker/docker/api/errors"
clustertypes "github.com/docker/docker/daemon/cluster/provider"
lncluster "github.com/docker/libnetwork/cluster"
networktypes "github.com/docker/libnetwork/types"
// NetworkControllerEnabled checks if the networking stack is enabled.
// This feature depends on OS primitives and it's disabled in systems like Windows.
func (daemon *Daemon) NetworkControllerEnabled() bool {
return daemon.netController != nil
// FindNetwork function finds a network for a given string that can represent network name or id
func (daemon *Daemon) FindNetwork(idName string) (libnetwork.Network, error) {
// Find by Name
n, err := daemon.GetNetworkByName(idName)
if err != nil && !isNoSuchNetworkError(err) {
return nil, err
if n != nil {
return n, nil
// Find by id
return daemon.GetNetworkByID(idName)
func isNoSuchNetworkError(err error) bool {
_, ok := err.(libnetwork.ErrNoSuchNetwork)
return ok
// GetNetworkByID function returns a network whose ID begins with the given prefix.
// It fails with an error if no matching, or more than one matching, networks are found.
func (daemon *Daemon) GetNetworkByID(partialID string) (libnetwork.Network, error) {
list := daemon.GetNetworksByID(partialID)
if len(list) == 0 {
return nil, libnetwork.ErrNoSuchNetwork(partialID)
if len(list) > 1 {
return nil, libnetwork.ErrInvalidID(partialID)
return list[0], nil
// GetNetworkByName function returns a network for a given network name.
// If no network name is given, the default network is returned.
func (daemon *Daemon) GetNetworkByName(name string) (libnetwork.Network, error) {
c := daemon.netController
if c == nil {
return nil, libnetwork.ErrNoSuchNetwork(name)
if name == "" {
name = c.Config().Daemon.DefaultNetwork
return c.NetworkByName(name)
// GetNetworksByID returns a list of networks whose ID partially matches zero or more networks
func (daemon *Daemon) GetNetworksByID(partialID string) []libnetwork.Network {
c := daemon.netController
if c == nil {
return nil
list := []libnetwork.Network{}
l := func(nw libnetwork.Network) bool {
if strings.HasPrefix(nw.ID(), partialID) {
list = append(list, nw)
return false
return list
// getAllNetworks returns a list containing all networks
func (daemon *Daemon) getAllNetworks() []libnetwork.Network {
return daemon.netController.Networks()
type ingressJob struct {
create *clustertypes.NetworkCreateRequest
ip net.IP
jobDone chan struct{}
var (
ingressWorkerOnce sync.Once
ingressJobsChannel chan *ingressJob
ingressID string
func (daemon *Daemon) startIngressWorker() {
ingressJobsChannel = make(chan *ingressJob, 100)
go func() {
for {
select {
case r := <-ingressJobsChannel:
if r.create != nil {
daemon.setupIngress(r.create, r.ip, ingressID)
ingressID = r.create.ID
} else {
ingressID = ""
// enqueueIngressJob adds a ingress add/rm request to the worker queue.
// It guarantees the worker is started.
func (daemon *Daemon) enqueueIngressJob(job *ingressJob) {
ingressJobsChannel <- job
// SetupIngress setups ingress networking.
// The function returns a channel which will signal the caller when the programming is completed.
func (daemon *Daemon) SetupIngress(create clustertypes.NetworkCreateRequest, nodeIP string) (<-chan struct{}, error) {
ip, _, err := net.ParseCIDR(nodeIP)
if err != nil {
return nil, err
done := make(chan struct{})
daemon.enqueueIngressJob(&ingressJob{&create, ip, done})
return done, nil
// ReleaseIngress releases the ingress networking.
// The function returns a channel which will signal the caller when the programming is completed.
func (daemon *Daemon) ReleaseIngress() (<-chan struct{}, error) {
done := make(chan struct{})
daemon.enqueueIngressJob(&ingressJob{nil, nil, done})
return done, nil
func (daemon *Daemon) setupIngress(create *clustertypes.NetworkCreateRequest, ip net.IP, staleID string) {
controller := daemon.netController
if staleID != "" && staleID != create.ID {
if _, err := daemon.createNetwork(create.NetworkCreateRequest, create.ID, true); err != nil {
// If it is any other error other than already
// exists error log error and return.
if _, ok := err.(libnetwork.NetworkNameError); !ok {
logrus.Errorf("Failed creating ingress network: %v", err)
// Otherwise continue down the call to create or recreate sandbox.
n, err := daemon.GetNetworkByID(create.ID)
if err != nil {
logrus.Errorf("Failed getting ingress network by id after creating: %v", err)
sb, err := controller.NewSandbox("ingress-sbox", libnetwork.OptionIngress())
if err != nil {
if _, ok := err.(networktypes.ForbiddenError); !ok {
logrus.Errorf("Failed creating ingress sandbox: %v", err)
ep, err := n.CreateEndpoint("ingress-endpoint", libnetwork.CreateOptionIpam(ip, nil, nil, nil))
if err != nil {
logrus.Errorf("Failed creating ingress endpoint: %v", err)
if err := ep.Join(sb, nil); err != nil {
logrus.Errorf("Failed joining ingress sandbox to ingress endpoint: %v", err)
if err := sb.EnableService(); err != nil {
logrus.Errorf("Failed enabling service for ingress sandbox")
func (daemon *Daemon) releaseIngress(id string) {
controller := daemon.netController
if err := controller.SandboxDestroy("ingress-sbox"); err != nil {
logrus.Errorf("Failed to delete ingress sandbox: %v", err)
if id == "" {
n, err := controller.NetworkByID(id)
if err != nil {
logrus.Errorf("failed to retrieve ingress network %s: %v", id, err)
for _, ep := range n.Endpoints() {
if err := ep.Delete(true); err != nil {
logrus.Errorf("Failed to delete endpoint %s (%s): %v", ep.Name(), ep.ID(), err)
if err := n.Delete(); err != nil {
logrus.Errorf("Failed to delete ingress network %s: %v", n.ID(), err)
// SetNetworkBootstrapKeys sets the bootstrap keys.
func (daemon *Daemon) SetNetworkBootstrapKeys(keys []*networktypes.EncryptionKey) error {
err := daemon.netController.SetKeys(keys)
if err == nil {
// Upon successful key setting dispatch the keys available event
return err
// UpdateAttachment notifies the attacher about the attachment config.
func (daemon *Daemon) UpdateAttachment(networkName, networkID, containerID string, config *network.NetworkingConfig) error {
if daemon.clusterProvider == nil {
return fmt.Errorf("cluster provider is not initialized")
if err := daemon.clusterProvider.UpdateAttachment(networkName, containerID, config); err != nil {
return daemon.clusterProvider.UpdateAttachment(networkID, containerID, config)
return nil
// WaitForDetachment makes the cluster manager wait for detachment of
// the container from the network.
func (daemon *Daemon) WaitForDetachment(ctx context.Context, networkName, networkID, taskID, containerID string) error {
if daemon.clusterProvider == nil {
return fmt.Errorf("cluster provider is not initialized")
return daemon.clusterProvider.WaitForDetachment(ctx, networkName, networkID, taskID, containerID)
// CreateManagedNetwork creates an agent network.
func (daemon *Daemon) CreateManagedNetwork(create clustertypes.NetworkCreateRequest) error {
_, err := daemon.createNetwork(create.NetworkCreateRequest, create.ID, true)
return err
// CreateNetwork creates a network with the given name, driver and other optional parameters
func (daemon *Daemon) CreateNetwork(create types.NetworkCreateRequest) (*types.NetworkCreateResponse, error) {
resp, err := daemon.createNetwork(create, "", false)
if err != nil {
return nil, err
return resp, err
func (daemon *Daemon) createNetwork(create types.NetworkCreateRequest, id string, agent bool) (*types.NetworkCreateResponse, error) {
if runconfig.IsPreDefinedNetwork(create.Name) && !agent {
err := fmt.Errorf("%s is a pre-defined network and cannot be created", create.Name)
return nil, apierrors.NewRequestForbiddenError(err)
var warning string
nw, err := daemon.GetNetworkByName(create.Name)
if err != nil {
if _, ok := err.(libnetwork.ErrNoSuchNetwork); !ok {
return nil, err
if nw != nil {
// check if user defined CheckDuplicate, if set true, return err
// otherwise prepare a warning message
if create.CheckDuplicate {
return nil, libnetwork.NetworkNameError(create.Name)
warning = fmt.Sprintf("Network with name %s (id : %s) already exists", nw.Name(), nw.ID())
c := daemon.netController
driver := create.Driver
if driver == "" {
driver = c.Config().Daemon.DefaultDriver
nwOptions := []libnetwork.NetworkOption{
if create.IPAM != nil {
ipam := create.IPAM
v4Conf, v6Conf, err := getIpamConfig(ipam.Config)
if err != nil {
return nil, err
nwOptions = append(nwOptions, libnetwork.NetworkOptionIpam(ipam.Driver, "", v4Conf, v6Conf, ipam.Options))
if create.Internal {
nwOptions = append(nwOptions, libnetwork.NetworkOptionInternalNetwork())
if agent {
nwOptions = append(nwOptions, libnetwork.NetworkOptionDynamic())
nwOptions = append(nwOptions, libnetwork.NetworkOptionPersist(false))
n, err := c.NewNetwork(driver, create.Name, id, nwOptions...)
if err != nil {
if _, ok := err.(libnetwork.ErrDataStoreNotInitialized); ok {
return nil, errors.New("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
return nil, err
daemon.pluginRefCount(driver, driverapi.NetworkPluginEndpointType, plugingetter.Acquire)
if create.IPAM != nil {
daemon.pluginRefCount(create.IPAM.Driver, ipamapi.PluginEndpointType, plugingetter.Acquire)
daemon.LogNetworkEvent(n, "create")
return &types.NetworkCreateResponse{
ID: n.ID(),
Warning: warning,
}, nil
func (daemon *Daemon) pluginRefCount(driver, capability string, mode int) {
var builtinDrivers []string
if capability == driverapi.NetworkPluginEndpointType {
builtinDrivers = daemon.netController.BuiltinDrivers()
} else if capability == ipamapi.PluginEndpointType {
builtinDrivers = daemon.netController.BuiltinIPAMDrivers()
for _, d := range builtinDrivers {
if d == driver {
if daemon.PluginStore != nil {
_, err := daemon.PluginStore.Get(driver, capability, mode)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{"mode": mode, "driver": driver}).Error("Error handling plugin refcount operation")
func getIpamConfig(data []network.IPAMConfig) ([]*libnetwork.IpamConf, []*libnetwork.IpamConf, error) {
ipamV4Cfg := []*libnetwork.IpamConf{}
ipamV6Cfg := []*libnetwork.IpamConf{}
for _, d := range data {
iCfg := libnetwork.IpamConf{}
iCfg.PreferredPool = d.Subnet
iCfg.SubPool = d.IPRange
iCfg.Gateway = d.Gateway
iCfg.AuxAddresses = d.AuxAddress
ip, _, err := net.ParseCIDR(d.Subnet)
if err != nil {
return nil, nil, fmt.Errorf("Invalid subnet %s : %v", d.Subnet, err)
if ip.To4() != nil {
ipamV4Cfg = append(ipamV4Cfg, &iCfg)
} else {
ipamV6Cfg = append(ipamV6Cfg, &iCfg)
return ipamV4Cfg, ipamV6Cfg, nil
// UpdateContainerServiceConfig updates a service configuration.
func (daemon *Daemon) UpdateContainerServiceConfig(containerName string, serviceConfig *clustertypes.ServiceConfig) error {
container, err := daemon.GetContainer(containerName)
if err != nil {
return err
container.NetworkSettings.Service = serviceConfig
return nil
// ConnectContainerToNetwork connects the given container to the given
// network. If either cannot be found, an err is returned. If the
// network cannot be set up, an err is returned.
func (daemon *Daemon) ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error {
if runtime.GOOS == "solaris" {
return errors.New("docker network connect is unsupported on Solaris platform")
container, err := daemon.GetContainer(containerName)
if err != nil {
return err
return daemon.ConnectToNetwork(container, networkName, endpointConfig)
// DisconnectContainerFromNetwork disconnects the given container from
// the given network. If either cannot be found, an err is returned.
func (daemon *Daemon) DisconnectContainerFromNetwork(containerName string, networkName string, force bool) error {
if runtime.GOOS == "solaris" {
return errors.New("docker network disconnect is unsupported on Solaris platform")
container, err := daemon.GetContainer(containerName)
if err != nil {
if force {
return daemon.ForceEndpointDelete(containerName, networkName)
return err
return daemon.DisconnectFromNetwork(container, networkName, force)
// GetNetworkDriverList returns the list of plugins drivers
// registered for network.
func (daemon *Daemon) GetNetworkDriverList() []string {
if !daemon.NetworkControllerEnabled() {
return nil
pluginList := daemon.netController.BuiltinDrivers()
managedPlugins := daemon.PluginStore.GetAllManagedPluginsByCap(driverapi.NetworkPluginEndpointType)
for _, plugin := range managedPlugins {
pluginList = append(pluginList, plugin.Name())
pluginMap := make(map[string]bool)
for _, plugin := range pluginList {
pluginMap[plugin] = true
networks := daemon.netController.Networks()
for _, network := range networks {
if !pluginMap[network.Type()] {
pluginList = append(pluginList, network.Type())
pluginMap[network.Type()] = true
return pluginList
// DeleteManagedNetwork deletes an agent network.
func (daemon *Daemon) DeleteManagedNetwork(networkID string) error {
return daemon.deleteNetwork(networkID, true)
// DeleteNetwork destroys a network unless it's one of docker's predefined networks.
func (daemon *Daemon) DeleteNetwork(networkID string) error {
return daemon.deleteNetwork(networkID, false)
func (daemon *Daemon) deleteNetwork(networkID string, dynamic bool) error {
nw, err := daemon.FindNetwork(networkID)
if err != nil {
return err
if runconfig.IsPreDefinedNetwork(nw.Name()) && !dynamic {
err := fmt.Errorf("%s is a pre-defined network and cannot be removed", nw.Name())
return apierrors.NewRequestForbiddenError(err)
if err := nw.Delete(); err != nil {
return err
daemon.pluginRefCount(nw.Type(), driverapi.NetworkPluginEndpointType, plugingetter.Release)
ipamType, _, _, _ := nw.Info().IpamConfig()
daemon.pluginRefCount(ipamType, ipamapi.PluginEndpointType, plugingetter.Release)
daemon.LogNetworkEvent(nw, "destroy")
return nil
// GetNetworks returns a list of all networks
func (daemon *Daemon) GetNetworks() []libnetwork.Network {
return daemon.getAllNetworks()
// clearAttachableNetworks removes the attachable networks
// after disconnecting any connected container
func (daemon *Daemon) clearAttachableNetworks() {
for _, n := range daemon.GetNetworks() {
if !n.Info().Attachable() {
for _, ep := range n.Endpoints() {
epInfo := ep.Info()
if epInfo == nil {
sb := epInfo.Sandbox()
if sb == nil {
containerID := sb.ContainerID()
if err := daemon.DisconnectContainerFromNetwork(containerID, n.ID(), true); err != nil {
logrus.Warnf("Failed to disconnect container %s from swarm network %s on cluster leave: %v",
containerID, n.Name(), err)
if err := daemon.DeleteManagedNetwork(n.ID()); err != nil {
logrus.Warnf("Failed to remove swarm network %s on cluster leave: %v", n.Name(), err)