1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #32717 from fcrisciani/data_path

Data path traffic separation option in swarm mode
This commit is contained in:
Brian Goff 2017-04-27 13:00:55 -04:00 committed by GitHub
commit 0307fe1a0b
20 changed files with 197 additions and 66 deletions

View file

@ -132,6 +132,7 @@ type ExternalCA struct {
type InitRequest struct { type InitRequest struct {
ListenAddr string ListenAddr string
AdvertiseAddr string AdvertiseAddr string
DataPathAddr string
ForceNewCluster bool ForceNewCluster bool
Spec Spec Spec Spec
AutoLockManagers bool AutoLockManagers bool
@ -142,6 +143,7 @@ type InitRequest struct {
type JoinRequest struct { type JoinRequest struct {
ListenAddr string ListenAddr string
AdvertiseAddr string AdvertiseAddr string
DataPathAddr string
RemoteAddrs []string RemoteAddrs []string
JoinToken string // accept by secret JoinToken string // accept by secret
Availability NodeAvailability Availability NodeAvailability

View file

@ -19,6 +19,7 @@ type initOptions struct {
listenAddr NodeAddrOption listenAddr NodeAddrOption
// Not a NodeAddrOption because it has no default port. // Not a NodeAddrOption because it has no default port.
advertiseAddr string advertiseAddr string
dataPathAddr string
forceNewCluster bool forceNewCluster bool
availability string availability string
} }
@ -40,6 +41,7 @@ func newInitCommand(dockerCli command.Cli) *cobra.Command {
flags := cmd.Flags() flags := cmd.Flags()
flags.Var(&opts.listenAddr, flagListenAddr, "Listen address (format: <ip|interface>[:port])") flags.Var(&opts.listenAddr, flagListenAddr, "Listen address (format: <ip|interface>[:port])")
flags.StringVar(&opts.advertiseAddr, flagAdvertiseAddr, "", "Advertised address (format: <ip|interface>[:port])") flags.StringVar(&opts.advertiseAddr, flagAdvertiseAddr, "", "Advertised address (format: <ip|interface>[:port])")
flags.StringVar(&opts.dataPathAddr, flagDataPathAddr, "", "Address or interface to use for data path traffic (format: <ip|interface>)")
flags.BoolVar(&opts.forceNewCluster, "force-new-cluster", false, "Force create a new cluster from current state") flags.BoolVar(&opts.forceNewCluster, "force-new-cluster", false, "Force create a new cluster from current state")
flags.BoolVar(&opts.autolock, flagAutolock, false, "Enable manager autolocking (requiring an unlock key to start a stopped manager)") flags.BoolVar(&opts.autolock, flagAutolock, false, "Enable manager autolocking (requiring an unlock key to start a stopped manager)")
flags.StringVar(&opts.availability, flagAvailability, "active", `Availability of the node ("active"|"pause"|"drain")`) flags.StringVar(&opts.availability, flagAvailability, "active", `Availability of the node ("active"|"pause"|"drain")`)
@ -54,6 +56,7 @@ func runInit(dockerCli command.Cli, flags *pflag.FlagSet, opts initOptions) erro
req := swarm.InitRequest{ req := swarm.InitRequest{
ListenAddr: opts.listenAddr.String(), ListenAddr: opts.listenAddr.String(),
AdvertiseAddr: opts.advertiseAddr, AdvertiseAddr: opts.advertiseAddr,
DataPathAddr: opts.dataPathAddr,
ForceNewCluster: opts.forceNewCluster, ForceNewCluster: opts.forceNewCluster,
Spec: opts.swarmOptions.ToSpec(flags), Spec: opts.swarmOptions.ToSpec(flags),
AutoLockManagers: opts.swarmOptions.autolock, AutoLockManagers: opts.swarmOptions.autolock,

View file

@ -19,6 +19,7 @@ type joinOptions struct {
listenAddr NodeAddrOption listenAddr NodeAddrOption
// Not a NodeAddrOption because it has no default port. // Not a NodeAddrOption because it has no default port.
advertiseAddr string advertiseAddr string
dataPathAddr string
token string token string
availability string availability string
} }
@ -41,6 +42,7 @@ func newJoinCommand(dockerCli command.Cli) *cobra.Command {
flags := cmd.Flags() flags := cmd.Flags()
flags.Var(&opts.listenAddr, flagListenAddr, "Listen address (format: <ip|interface>[:port])") flags.Var(&opts.listenAddr, flagListenAddr, "Listen address (format: <ip|interface>[:port])")
flags.StringVar(&opts.advertiseAddr, flagAdvertiseAddr, "", "Advertised address (format: <ip|interface>[:port])") flags.StringVar(&opts.advertiseAddr, flagAdvertiseAddr, "", "Advertised address (format: <ip|interface>[:port])")
flags.StringVar(&opts.dataPathAddr, flagDataPathAddr, "", "Address or interface to use for data path traffic (format: <ip|interface>)")
flags.StringVar(&opts.token, flagToken, "", "Token for entry into the swarm") flags.StringVar(&opts.token, flagToken, "", "Token for entry into the swarm")
flags.StringVar(&opts.availability, flagAvailability, "active", `Availability of the node ("active"|"pause"|"drain")`) flags.StringVar(&opts.availability, flagAvailability, "active", `Availability of the node ("active"|"pause"|"drain")`)
return cmd return cmd
@ -54,6 +56,7 @@ func runJoin(dockerCli command.Cli, flags *pflag.FlagSet, opts joinOptions) erro
JoinToken: opts.token, JoinToken: opts.token,
ListenAddr: opts.listenAddr.String(), ListenAddr: opts.listenAddr.String(),
AdvertiseAddr: opts.advertiseAddr, AdvertiseAddr: opts.advertiseAddr,
DataPathAddr: opts.dataPathAddr,
RemoteAddrs: []string{opts.remote}, RemoteAddrs: []string{opts.remote},
} }
if flags.Changed(flagAvailability) { if flags.Changed(flagAvailability) {

View file

@ -19,6 +19,7 @@ const (
flagDispatcherHeartbeat = "dispatcher-heartbeat" flagDispatcherHeartbeat = "dispatcher-heartbeat"
flagListenAddr = "listen-addr" flagListenAddr = "listen-addr"
flagAdvertiseAddr = "advertise-addr" flagAdvertiseAddr = "advertise-addr"
flagDataPathAddr = "data-path-addr"
flagQuiet = "quiet" flagQuiet = "quiet"
flagRotate = "rotate" flagRotate = "rotate"
flagToken = "token" flagToken = "token"

View file

@ -3301,7 +3301,7 @@ _docker_swarm_init() {
case "$cur" in case "$cur" in
-*) -*)
COMPREPLY=( $( compgen -W "--advertise-addr --autolock --availability --cert-expiry --dispatcher-heartbeat --external-ca --force-new-cluster --help --listen-addr --max-snapshots --snapshot-interval --task-history-limit" -- "$cur" ) ) COMPREPLY=( $( compgen -W "--advertise-addr --data-path-addr --autolock --availability --cert-expiry --dispatcher-heartbeat --external-ca --force-new-cluster --help --listen-addr --max-snapshots --snapshot-interval --task-history-limit" -- "$cur" ) )
;; ;;
esac esac
} }
@ -3337,7 +3337,7 @@ _docker_swarm_join() {
case "$cur" in case "$cur" in
-*) -*)
COMPREPLY=( $( compgen -W "--advertise-addr --availability --help --listen-addr --token" -- "$cur" ) ) COMPREPLY=( $( compgen -W "--advertise-addr --data-path-addr --availability --help --listen-addr --token" -- "$cur" ) )
;; ;;
*:) *:)
COMPREPLY=( $( compgen -W "2377" -- "${cur##*:}" ) ) COMPREPLY=( $( compgen -W "2377" -- "${cur##*:}" ) )

View file

@ -2267,6 +2267,7 @@ __docker_swarm_subcommand() {
_arguments $(__docker_arguments) \ _arguments $(__docker_arguments) \
$opts_help \ $opts_help \
"($help)--advertise-addr=[Advertised address]:ip\:port: " \ "($help)--advertise-addr=[Advertised address]:ip\:port: " \
"($help)--data-path-addr=[Data path IP or interface]:ip " \
"($help)--autolock[Enable manager autolocking]" \ "($help)--autolock[Enable manager autolocking]" \
"($help)--availability=[Availability of the node]:availability:(active drain pause)" \ "($help)--availability=[Availability of the node]:availability:(active drain pause)" \
"($help)--cert-expiry=[Validity period for node certificates]:duration: " \ "($help)--cert-expiry=[Validity period for node certificates]:duration: " \
@ -2282,6 +2283,7 @@ __docker_swarm_subcommand() {
_arguments $(__docker_arguments) -A '-*' \ _arguments $(__docker_arguments) -A '-*' \
$opts_help \ $opts_help \
"($help)--advertise-addr=[Advertised address]:ip\:port: " \ "($help)--advertise-addr=[Advertised address]:ip\:port: " \
"($help)--data-path-addr=[Data path IP or interface]:ip " \
"($help)--availability=[Availability of the node]:availability:(active drain pause)" \ "($help)--availability=[Availability of the node]:availability:(active drain pause)" \
"($help)--listen-addr=[Listen address]:ip\:port: " \ "($help)--listen-addr=[Listen address]:ip\:port: " \
"($help)--token=[Token for entry into the swarm]:secret: " \ "($help)--token=[Token for entry into the swarm]:secret: " \

View file

@ -270,6 +270,16 @@ func (c *Cluster) GetAdvertiseAddress() string {
return c.currentNodeState().actualLocalAddr return c.currentNodeState().actualLocalAddr
} }
// GetDataPathAddress returns the address to be used for the data path traffic, if specified.
func (c *Cluster) GetDataPathAddress() string {
c.mu.RLock()
defer c.mu.RUnlock()
if c.nr != nil {
return c.nr.config.DataPathAddr
}
return ""
}
// GetRemoteAddress returns a known advertise address of a remote manager if // GetRemoteAddress returns a known advertise address of a remote manager if
// available. // available.
// todo: change to array/connect with info // todo: change to array/connect with info

View file

@ -10,8 +10,10 @@ var (
errNoSuchInterface = errors.New("no such interface") errNoSuchInterface = errors.New("no such interface")
errNoIP = errors.New("could not find the system's IP address") errNoIP = errors.New("could not find the system's IP address")
errMustSpecifyListenAddr = errors.New("must specify a listening address because the address to advertise is not recognized as a system address, and a system's IP address to use could not be uniquely identified") errMustSpecifyListenAddr = errors.New("must specify a listening address because the address to advertise is not recognized as a system address, and a system's IP address to use could not be uniquely identified")
errBadNetworkIdentifier = errors.New("must specify a valid IP address or interface name")
errBadListenAddr = errors.New("listen address must be an IP address or network interface (with optional port number)") errBadListenAddr = errors.New("listen address must be an IP address or network interface (with optional port number)")
errBadAdvertiseAddr = errors.New("advertise address must be a non-zero IP address or network interface (with optional port number)") errBadAdvertiseAddr = errors.New("advertise address must be a non-zero IP address or network interface (with optional port number)")
errBadDataPathAddr = errors.New("data path address must be a non-zero IP address or network interface (without a port number)")
errBadDefaultAdvertiseAddr = errors.New("default advertise address must be a non-zero IP address or network interface (without a port number)") errBadDefaultAdvertiseAddr = errors.New("default advertise address must be a non-zero IP address or network interface (without a port number)")
) )
@ -20,23 +22,17 @@ func resolveListenAddr(specifiedAddr string) (string, string, error) {
if err != nil { if err != nil {
return "", "", fmt.Errorf("could not parse listen address %s", specifiedAddr) return "", "", fmt.Errorf("could not parse listen address %s", specifiedAddr)
} }
// Does the host component match any of the interface names on the // Does the host component match any of the interface names on the
// system? If so, use the address from that interface. // system? If so, use the address from that interface.
interfaceAddr, err := resolveInterfaceAddr(specifiedHost) specifiedIP, err := resolveInputIPAddr(specifiedHost, true)
if err == nil { if err != nil {
return interfaceAddr.String(), specifiedPort, nil if err == errBadNetworkIdentifier {
err = errBadListenAddr
} }
if err != errNoSuchInterface {
return "", "", err return "", "", err
} }
// If it's not an interface, it must be an IP (for now) return specifiedIP.String(), specifiedPort, nil
if net.ParseIP(specifiedHost) == nil {
return "", "", errBadListenAddr
}
return specifiedHost, specifiedPort, nil
} }
func (c *Cluster) resolveAdvertiseAddr(advertiseAddr, listenAddrPort string) (string, string, error) { func (c *Cluster) resolveAdvertiseAddr(advertiseAddr, listenAddrPort string) (string, string, error) {
@ -57,43 +53,32 @@ func (c *Cluster) resolveAdvertiseAddr(advertiseAddr, listenAddrPort string) (st
advertiseHost = advertiseAddr advertiseHost = advertiseAddr
advertisePort = listenAddrPort advertisePort = listenAddrPort
} }
// Does the host component match any of the interface names on the // Does the host component match any of the interface names on the
// system? If so, use the address from that interface. // system? If so, use the address from that interface.
interfaceAddr, err := resolveInterfaceAddr(advertiseHost) advertiseIP, err := resolveInputIPAddr(advertiseHost, false)
if err == nil { if err != nil {
return interfaceAddr.String(), advertisePort, nil if err == errBadNetworkIdentifier {
err = errBadAdvertiseAddr
} }
if err != errNoSuchInterface {
return "", "", err return "", "", err
} }
// If it's not an interface, it must be an IP (for now) return advertiseIP.String(), advertisePort, nil
if ip := net.ParseIP(advertiseHost); ip == nil || ip.IsUnspecified() {
return "", "", errBadAdvertiseAddr
}
return advertiseHost, advertisePort, nil
} }
if c.config.DefaultAdvertiseAddr != "" { if c.config.DefaultAdvertiseAddr != "" {
// Does the default advertise address component match any of the // Does the default advertise address component match any of the
// interface names on the system? If so, use the address from // interface names on the system? If so, use the address from
// that interface. // that interface.
interfaceAddr, err := resolveInterfaceAddr(c.config.DefaultAdvertiseAddr) defaultAdvertiseIP, err := resolveInputIPAddr(c.config.DefaultAdvertiseAddr, false)
if err == nil { if err != nil {
return interfaceAddr.String(), listenAddrPort, nil if err == errBadNetworkIdentifier {
err = errBadDefaultAdvertiseAddr
} }
if err != errNoSuchInterface {
return "", "", err return "", "", err
} }
// If it's not an interface, it must be an IP (for now) return defaultAdvertiseIP.String(), listenAddrPort, nil
if ip := net.ParseIP(c.config.DefaultAdvertiseAddr); ip == nil || ip.IsUnspecified() {
return "", "", errBadDefaultAdvertiseAddr
}
return c.config.DefaultAdvertiseAddr, listenAddrPort, nil
} }
systemAddr, err := c.resolveSystemAddr() systemAddr, err := c.resolveSystemAddr()
@ -103,6 +88,22 @@ func (c *Cluster) resolveAdvertiseAddr(advertiseAddr, listenAddrPort string) (st
return systemAddr.String(), listenAddrPort, nil return systemAddr.String(), listenAddrPort, nil
} }
func resolveDataPathAddr(dataPathAddr string) (string, error) {
if dataPathAddr == "" {
// dataPathAddr is not defined
return "", nil
}
// If a data path flag is specified try to resolve the IP address.
dataPathIP, err := resolveInputIPAddr(dataPathAddr, false)
if err != nil {
if err == errBadNetworkIdentifier {
err = errBadDataPathAddr
}
return "", err
}
return dataPathIP.String(), nil
}
func resolveInterfaceAddr(specifiedInterface string) (net.IP, error) { func resolveInterfaceAddr(specifiedInterface string) (net.IP, error) {
// Use a specific interface's IP address. // Use a specific interface's IP address.
intf, err := net.InterfaceByName(specifiedInterface) intf, err := net.InterfaceByName(specifiedInterface)
@ -149,6 +150,30 @@ func resolveInterfaceAddr(specifiedInterface string) (net.IP, error) {
return interfaceAddr6, nil return interfaceAddr6, nil
} }
// resolveInputIPAddr tries to resolve the IP address from the string passed as input
// - tries to match the string as an interface name, if so returns the IP address associated with it
// - on failure of previous step tries to parse the string as an IP address itself
// if succeeds returns the IP address
func resolveInputIPAddr(input string, isUnspecifiedValid bool) (net.IP, error) {
// Try to see if it is an interface name
interfaceAddr, err := resolveInterfaceAddr(input)
if err == nil {
return interfaceAddr, nil
}
// String matched interface but there is a potential ambiguity to be resolved
if err != errNoSuchInterface {
return nil, err
}
// String is not an interface check if it is a valid IP
if ip := net.ParseIP(input); ip != nil && (isUnspecifiedValid || !ip.IsUnspecified()) {
return ip, nil
}
// Not valid IP found
return nil, errBadNetworkIdentifier
}
func (c *Cluster) resolveSystemAddrViaSubnetCheck() (net.IP, error) { func (c *Cluster) resolveSystemAddrViaSubnetCheck() (net.IP, error) {
// Use the system's only IP address, or fail if there are // Use the system's only IP address, or fail if there are
// multiple addresses to choose from. Skip interfaces which // multiple addresses to choose from. Skip interfaces which

View file

@ -47,6 +47,9 @@ type nodeStartConfig struct {
// AdvertiseAddr is the address other nodes should connect to, // AdvertiseAddr is the address other nodes should connect to,
// including a port. // including a port.
AdvertiseAddr string AdvertiseAddr string
// DataPathAddr is the address that has to be used for the data path
DataPathAddr string
joinAddr string joinAddr string
forceNewCluster bool forceNewCluster bool
joinToken string joinToken string

View file

@ -54,6 +54,11 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) {
return "", err return "", err
} }
dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
if err != nil {
return "", err
}
localAddr := listenHost localAddr := listenHost
// If the local address is undetermined, the advertise address // If the local address is undetermined, the advertise address
@ -93,6 +98,7 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) {
LocalAddr: localAddr, LocalAddr: localAddr,
ListenAddr: net.JoinHostPort(listenHost, listenPort), ListenAddr: net.JoinHostPort(listenHost, listenPort),
AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort), AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort),
DataPathAddr: dataPathAddr,
availability: req.Availability, availability: req.Availability,
}) })
if err != nil { if err != nil {
@ -155,12 +161,18 @@ func (c *Cluster) Join(req types.JoinRequest) error {
} }
} }
dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
if err != nil {
return err
}
clearPersistentState(c.root) clearPersistentState(c.root)
nr, err := c.newNodeRunner(nodeStartConfig{ nr, err := c.newNodeRunner(nodeStartConfig{
RemoteAddr: req.RemoteAddrs[0], RemoteAddr: req.RemoteAddrs[0],
ListenAddr: net.JoinHostPort(listenHost, listenPort), ListenAddr: net.JoinHostPort(listenHost, listenPort),
AdvertiseAddr: advertiseAddr, AdvertiseAddr: advertiseAddr,
DataPathAddr: dataPathAddr,
joinAddr: req.RemoteAddrs[0], joinAddr: req.RemoteAddrs[0],
joinToken: req.JoinToken, joinToken: req.JoinToken,
availability: req.Availability, availability: req.Availability,

View file

@ -25,6 +25,7 @@ Options:
--autolock Enable manager autolocking (requiring an unlock key to start a stopped manager) --autolock Enable manager autolocking (requiring an unlock key to start a stopped manager)
--availability string Availability of the node ("active"|"pause"|"drain") (default "active") --availability string Availability of the node ("active"|"pause"|"drain") (default "active")
--cert-expiry duration Validity period for node certificates (ns|us|ms|s|m|h) (default 2160h0m0s) --cert-expiry duration Validity period for node certificates (ns|us|ms|s|m|h) (default 2160h0m0s)
--data-path-addr string Address or interface to use for data path traffic (format: <ip|interface>)
--dispatcher-heartbeat duration Dispatcher heartbeat period (ns|us|ms|s|m|h) (default 5s) --dispatcher-heartbeat duration Dispatcher heartbeat period (ns|us|ms|s|m|h) (default 5s)
--external-ca external-ca Specifications of one or more certificate signing endpoints --external-ca external-ca Specifications of one or more certificate signing endpoints
--force-new-cluster Force create a new cluster from current state --force-new-cluster Force create a new cluster from current state
@ -118,6 +119,15 @@ for example `--advertise-addr eth0:2377`.
Specifying a port is optional. If the value is a bare IP address or interface Specifying a port is optional. If the value is a bare IP address or interface
name, the default port 2377 will be used. name, the default port 2377 will be used.
### `--data-path-addr`
This flag specifies the address that global scope network drivers will publish towards
other nodes in order to reach the containers running on this node.
Using this parameter it is then possible to separate the container's data traffic from the
management traffic of the cluster.
If unspecified, Docker will use the same IP address or interface that is used for the
advertise address.
### `--task-history-limit` ### `--task-history-limit`
This flag sets up task history retention limit. This flag sets up task history retention limit.

View file

@ -23,6 +23,7 @@ Join a swarm as a node and/or manager
Options: Options:
--advertise-addr string Advertised address (format: <ip|interface>[:port]) --advertise-addr string Advertised address (format: <ip|interface>[:port])
--availability string Availability of the node ("active"|"pause"|"drain") (default "active") --availability string Availability of the node ("active"|"pause"|"drain") (default "active")
--data-path-addr string Address or interface to use for data path traffic (format: <ip|interface>)
--help Print usage --help Print usage
--listen-addr node-addr Listen address (format: <ip|interface>[:port]) (default 0.0.0.0:2377) --listen-addr node-addr Listen address (format: <ip|interface>[:port]) (default 0.0.0.0:2377)
--token string Token for entry into the swarm --token string Token for entry into the swarm
@ -95,6 +96,15 @@ name, the default port 2377 will be used.
This flag is generally not necessary when joining an existing swarm. This flag is generally not necessary when joining an existing swarm.
### `--data-path-addr`
This flag specifies the address that global scope network drivers will publish towards
other nodes in order to reach the containers running on this node.
Using this parameter it is then possible to separate the container's data traffic from the
management traffic of the cluster.
If unspecified, Docker will use the same IP address or interface that is used for the
advertise address.
### `--token string` ### `--token string`
Secret value required for nodes to join the swarm Secret value required for nodes to join the swarm

View file

@ -1932,3 +1932,15 @@ func (s *DockerSwarmSuite) TestSwarmServiceLsFilterMode(c *check.C) {
c.Assert(out, checker.Contains, "top1") c.Assert(out, checker.Contains, "top1")
c.Assert(out, checker.Not(checker.Contains), "top2") c.Assert(out, checker.Not(checker.Contains), "top2")
} }
func (s *DockerSwarmSuite) TestSwarmInitUnspecifiedDataPathAddr(c *check.C) {
d := s.AddDaemon(c, false, false)
out, err := d.Cmd("swarm", "init", "--data-path-addr", "0.0.0.0")
c.Assert(err, checker.NotNil)
c.Assert(out, checker.Contains, "data path address must be a non-zero IP")
out, err = d.Cmd("swarm", "init", "--data-path-addr", "0.0.0.0:2000")
c.Assert(err, checker.NotNil)
c.Assert(out, checker.Contains, "data path address must be a non-zero IP")
}

View file

@ -24,7 +24,7 @@ github.com/RackSec/srslog 456df3a81436d29ba874f3590eeeee25d666f8a5
github.com/imdario/mergo 0.2.1 github.com/imdario/mergo 0.2.1
#get libnetwork packages #get libnetwork packages
github.com/docker/libnetwork b13e0604016a4944025aaff521d9c125850b0d04 github.com/docker/libnetwork 5dc95a3f9ce4b70bf08492ca37ec55c5b6d84975
github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894 github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec

View file

@ -39,11 +39,21 @@ type agent struct {
networkDB *networkdb.NetworkDB networkDB *networkdb.NetworkDB
bindAddr string bindAddr string
advertiseAddr string advertiseAddr string
dataPathAddr string
epTblCancel func() epTblCancel func()
driverCancelFuncs map[string][]func() driverCancelFuncs map[string][]func()
sync.Mutex sync.Mutex
} }
func (a *agent) dataPathAddress() string {
a.Lock()
defer a.Unlock()
if a.dataPathAddr != "" {
return a.dataPathAddr
}
return a.advertiseAddr
}
const libnetworkEPTable = "endpoint_table" const libnetworkEPTable = "endpoint_table"
func getBindAddr(ifaceName string) (string, error) { func getBindAddr(ifaceName string) (string, error) {
@ -187,16 +197,25 @@ func (c *controller) agentSetup() error {
clusterProvider := c.cfg.Daemon.ClusterProvider clusterProvider := c.cfg.Daemon.ClusterProvider
agent := c.agent agent := c.agent
c.Unlock() c.Unlock()
if clusterProvider == nil {
msg := "Aborting initialization of Libnetwork Agent because cluster provider is now unset"
logrus.Errorf(msg)
return fmt.Errorf(msg)
}
bindAddr := clusterProvider.GetLocalAddress() bindAddr := clusterProvider.GetLocalAddress()
advAddr := clusterProvider.GetAdvertiseAddress() advAddr := clusterProvider.GetAdvertiseAddress()
dataAddr := clusterProvider.GetDataPathAddress()
remote := clusterProvider.GetRemoteAddress() remote := clusterProvider.GetRemoteAddress()
remoteAddr, _, _ := net.SplitHostPort(remote) remoteAddr, _, _ := net.SplitHostPort(remote)
listen := clusterProvider.GetListenAddress() listen := clusterProvider.GetListenAddress()
listenAddr, _, _ := net.SplitHostPort(listen) listenAddr, _, _ := net.SplitHostPort(listen)
logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Remote-addr =%s", listenAddr, bindAddr, advAddr, remoteAddr) logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr=%s",
listenAddr, bindAddr, advAddr, dataAddr, remoteAddr)
if advAddr != "" && agent == nil { if advAddr != "" && agent == nil {
if err := c.agentInit(listenAddr, bindAddr, advAddr); err != nil { if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
logrus.Errorf("Error in agentInit : %v", err) logrus.Errorf("Error in agentInit : %v", err)
} else { } else {
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool { c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
@ -262,7 +281,7 @@ func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
return keys[1].Key, keys[1].LamportTime, nil return keys[1].Key, keys[1].LamportTime, nil
} }
func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr string) error { func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
if !c.isAgent() { if !c.isAgent() {
return nil return nil
} }
@ -296,6 +315,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st
networkDB: nDB, networkDB: nDB,
bindAddr: bindAddr, bindAddr: bindAddr,
advertiseAddr: advertiseAddr, advertiseAddr: advertiseAddr,
dataPathAddr: dataPathAddr,
epTblCancel: cancel, epTblCancel: cancel,
driverCancelFuncs: make(map[string][]func()), driverCancelFuncs: make(map[string][]func()),
} }
@ -336,25 +356,22 @@ func (c *controller) agentDriverNotify(d driverapi.Driver) {
return return
} }
d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{ if err := d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
Address: agent.advertiseAddr, Address: agent.dataPathAddress(),
BindAddress: agent.bindAddr, BindAddress: agent.bindAddr,
Self: true, Self: true,
}) }); err != nil {
logrus.Warnf("Failed the node discovery in driver: %v", err)
}
drvEnc := discoverapi.DriverEncryptionConfig{} drvEnc := discoverapi.DriverEncryptionConfig{}
keys, tags := c.getKeys(subsysIPSec) keys, tags := c.getKeys(subsysIPSec)
drvEnc.Keys = keys drvEnc.Keys = keys
drvEnc.Tags = tags drvEnc.Tags = tags
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool { if err := d.DiscoverNew(discoverapi.EncryptionKeysConfig, drvEnc); err != nil {
err := driver.DiscoverNew(discoverapi.EncryptionKeysConfig, drvEnc) logrus.Warnf("Failed to set datapath keys in driver: %v", err)
if err != nil {
logrus.Warnf("Failed to set datapath keys in driver %s: %v", name, err)
} }
return false
})
} }
func (c *controller) agentClose() { func (c *controller) agentClose() {

View file

@ -12,6 +12,7 @@ type Provider interface {
GetLocalAddress() string GetLocalAddress() string
GetListenAddress() string GetListenAddress() string
GetAdvertiseAddress() string GetAdvertiseAddress() string
GetDataPathAddress() string
GetRemoteAddress() string GetRemoteAddress() string
ListenClusterEvents() <-chan struct{} ListenClusterEvents() <-chan struct{}
AttachNetwork(string, string, []string) (*network.NetworkingConfig, error) AttachNetwork(string, string, []string) (*network.NetworkingConfig, error)

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/Sirupsen/logrus"
"github.com/docker/libnetwork/netlabel" "github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/types" "github.com/docker/libnetwork/types"
) )
@ -72,9 +73,19 @@ func (sb *sandbox) setupDefaultGW() error {
if err != nil { if err != nil {
return fmt.Errorf("container %s: endpoint create on GW Network failed: %v", sb.containerID, err) return fmt.Errorf("container %s: endpoint create on GW Network failed: %v", sb.containerID, err)
} }
defer func() {
if err != nil {
if err2 := newEp.Delete(true); err2 != nil {
logrus.Warnf("Failed to remove gw endpoint for container %s after failing to join the gateway network: %v",
sb.containerID, err2)
}
}
}()
epLocal := newEp.(*endpoint) epLocal := newEp.(*endpoint)
if err := epLocal.sbJoin(sb); err != nil { if err = epLocal.sbJoin(sb); err != nil {
return fmt.Errorf("container %s: endpoint join on GW Network failed: %v", sb.containerID, err) return fmt.Errorf("container %s: endpoint join on GW Network failed: %v", sb.containerID, err)
} }

View file

@ -427,7 +427,7 @@ func (ep *endpoint) Join(sbox Sandbox, options ...EndpointOption) error {
return ep.sbJoin(sb, options...) return ep.sbJoin(sb, options...)
} }
func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error { func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) (err error) {
n, err := ep.getNetworkFromStore() n, err := ep.getNetworkFromStore()
if err != nil { if err != nil {
return fmt.Errorf("failed to get network from store during join: %v", err) return fmt.Errorf("failed to get network from store during join: %v", err)
@ -462,7 +462,7 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error {
d, err := n.driver(true) d, err := n.driver(true)
if err != nil { if err != nil {
return fmt.Errorf("failed to join endpoint: %v", err) return fmt.Errorf("failed to get driver during join: %v", err)
} }
err = d.Join(nid, epid, sb.Key(), ep, sb.Labels()) err = d.Join(nid, epid, sb.Key(), ep, sb.Labels())
@ -471,8 +471,8 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error {
} }
defer func() { defer func() {
if err != nil { if err != nil {
if err := d.Leave(nid, epid); err != nil { if e := d.Leave(nid, epid); e != nil {
logrus.Warnf("driver leave failed while rolling back join: %v", err) logrus.Warnf("driver leave failed while rolling back join: %v", e)
} }
} }
}() }()
@ -538,11 +538,11 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error {
logrus.Debugf("Revoking external connectivity on endpoint %s (%s)", extEp.Name(), extEp.ID()) logrus.Debugf("Revoking external connectivity on endpoint %s (%s)", extEp.Name(), extEp.ID())
extN, err := extEp.getNetworkFromStore() extN, err := extEp.getNetworkFromStore()
if err != nil { if err != nil {
return fmt.Errorf("failed to get network from store during join: %v", err) return fmt.Errorf("failed to get network from store for revoking external connectivity during join: %v", err)
} }
extD, err := extN.driver(true) extD, err := extN.driver(true)
if err != nil { if err != nil {
return fmt.Errorf("failed to join endpoint: %v", err) return fmt.Errorf("failed to get driver for revoking external connectivity during join: %v", err)
} }
if err = extD.RevokeExternalConnectivity(extEp.network.ID(), extEp.ID()); err != nil { if err = extD.RevokeExternalConnectivity(extEp.network.ID(), extEp.ID()); err != nil {
return types.InternalErrorf( return types.InternalErrorf(
@ -570,9 +570,9 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error {
} }
if !sb.needDefaultGW() { if !sb.needDefaultGW() {
if err := sb.clearDefaultGW(); err != nil { if e := sb.clearDefaultGW(); e != nil {
logrus.Warnf("Failure while disconnecting sandbox %s (%s) from gateway network: %v", logrus.Warnf("Failure while disconnecting sandbox %s (%s) from gateway network: %v",
sb.ID(), sb.ContainerID(), err) sb.ID(), sb.ContainerID(), e)
} }
} }
@ -705,7 +705,7 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption)
d, err := n.driver(!force) d, err := n.driver(!force)
if err != nil { if err != nil {
return fmt.Errorf("failed to leave endpoint: %v", err) return fmt.Errorf("failed to get driver during endpoint leave: %v", err)
} }
ep.Lock() ep.Lock()
@ -765,11 +765,11 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption)
logrus.Debugf("Programming external connectivity on endpoint %s (%s)", extEp.Name(), extEp.ID()) logrus.Debugf("Programming external connectivity on endpoint %s (%s)", extEp.Name(), extEp.ID())
extN, err := extEp.getNetworkFromStore() extN, err := extEp.getNetworkFromStore()
if err != nil { if err != nil {
return fmt.Errorf("failed to get network from store during leave: %v", err) return fmt.Errorf("failed to get network from store for programming external connectivity during leave: %v", err)
} }
extD, err := extN.driver(true) extD, err := extN.driver(true)
if err != nil { if err != nil {
return fmt.Errorf("failed to leave endpoint: %v", err) return fmt.Errorf("failed to get driver for programming external connectivity during leave: %v", err)
} }
if err := extD.ProgramExternalConnectivity(extEp.network.ID(), extEp.ID(), sb.Labels()); err != nil { if err := extD.ProgramExternalConnectivity(extEp.network.ID(), extEp.ID(), sb.Labels()); err != nil {
logrus.Warnf("driver failed programming external connectivity on endpoint %s: (%s) %v", logrus.Warnf("driver failed programming external connectivity on endpoint %s: (%s) %v",

View file

@ -86,6 +86,15 @@ func (nDB *NetworkDB) sendNodeEvent(event NodeEvent_Type) error {
notify: notifyCh, notify: notifyCh,
}) })
nDB.RLock()
noPeers := len(nDB.nodes) <= 1
nDB.RUnlock()
// Message enqueued, do not wait for a send if no peer is present
if noPeers {
return nil
}
// Wait for the broadcast // Wait for the broadcast
select { select {
case <-notifyCh: case <-notifyCh:

View file

@ -17,7 +17,7 @@ import (
) )
const ( const (
reapInterval = 60 * time.Second reapInterval = 30 * time.Minute
reapPeriod = 5 * time.Second reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour nodeReapInterval = 24 * time.Hour