diff --git a/libnetwork/agent.go b/libnetwork/agent.go index eea1c5a0c9..b3790fd93c 100644 --- a/libnetwork/agent.go +++ b/libnetwork/agent.go @@ -62,7 +62,7 @@ func resolveAddr(addrOrInterface string) (string, error) { } func (c *controller) agentInit(bindAddrOrInterface string) error { - if !c.cfg.Daemon.IsAgent { + if !c.isAgent() { return nil } @@ -94,12 +94,12 @@ func (c *controller) agentInit(bindAddrOrInterface string) error { return nil } -func (c *controller) agentJoin(remotes []string) error { +func (c *controller) agentJoin(remote string) error { if c.agent == nil { return nil } - return c.agent.networkDB.Join(remotes) + return c.agent.networkDB.Join([]string{remote}) } func (c *controller) agentDriverNotify(d driverapi.Driver) { @@ -126,6 +126,7 @@ func (c *controller) agentClose() { c.agent.epTblCancel() c.agent.networkDB.Close() + c.agent = nil } func (n *network) isClusterEligible() bool { diff --git a/libnetwork/cluster/provider.go b/libnetwork/cluster/provider.go new file mode 100644 index 0000000000..3b91a41ff8 --- /dev/null +++ b/libnetwork/cluster/provider.go @@ -0,0 +1,10 @@ +package cluster + +// Provider provides clustering config details +type Provider interface { + IsManager() bool + IsAgent() bool + GetListenAddress() string + GetRemoteAddress() string + ListenClusterEvents() <-chan struct{} +} diff --git a/libnetwork/cmd/dnet/dnet.go b/libnetwork/cmd/dnet/dnet.go index 885691f009..7c827b8209 100644 --- a/libnetwork/cmd/dnet/dnet.go +++ b/libnetwork/cmd/dnet/dnet.go @@ -16,6 +16,7 @@ import ( "syscall" "time" + "github.com/BurntSushi/toml" "github.com/codegangsta/cli" "github.com/docker/docker/opts" "github.com/docker/docker/pkg/discovery" @@ -64,13 +65,31 @@ func main() { } } -func parseConfig(cfgFile string) (*config.Config, error) { +// ParseConfig parses the libnetwork configuration file +func (d *dnetConnection) parseOrchestrationConfig(tomlCfgFile string) error { + dummy := &dnetConnection{} + + if _, err := toml.DecodeFile(tomlCfgFile, dummy); err != nil { + return err + } + + if dummy.Orchestration != nil { + d.Orchestration = dummy.Orchestration + } + return nil +} + +func (d *dnetConnection) parseConfig(cfgFile string) (*config.Config, error) { if strings.Trim(cfgFile, " ") == "" { cfgFile = os.Getenv(cfgFileEnv) if strings.Trim(cfgFile, " ") == "" { cfgFile = defaultCfgFile } } + + if err := d.parseOrchestrationConfig(cfgFile); err != nil { + return nil, err + } return config.ParseConfig(cfgFile) } @@ -91,15 +110,6 @@ func processConfig(cfg *config.Config) []config.Option { dd = cfg.Daemon.DefaultDriver } options = append(options, config.OptionDefaultDriver(dd)) - if cfg.Daemon.IsAgent { - options = append(options, config.OptionAgent()) - } - - if cfg.Daemon.Bind != "" { - options = append(options, config.OptionBind(cfg.Daemon.Bind)) - } - - options = append(options, config.OptionNeighbors(cfg.Daemon.Neighbors)) if cfg.Daemon.Labels != nil { options = append(options, config.OptionLabels(cfg.Daemon.Labels)) @@ -220,7 +230,17 @@ type dnetConnection struct { // proto holds the client protocol i.e. unix. proto string // addr holds the client address. - addr string + addr string + Orchestration *NetworkOrchestration + configEvent chan struct{} +} + +// NetworkOrchestration exported +type NetworkOrchestration struct { + Agent bool + Manager bool + Bind string + Peer string } func (d *dnetConnection) dnetDaemon(cfgFile string) error { @@ -228,10 +248,12 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error { return fmt.Errorf("failed to start test driver: %v\n", err) } - cfg, err := parseConfig(cfgFile) + cfg, err := d.parseConfig(cfgFile) var cOptions []config.Option if err == nil { cOptions = processConfig(cfg) + } else { + logrus.Errorf("Error parsing config %v", err) } bridgeConfig := options.Generic{ @@ -248,6 +270,11 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error { fmt.Println("Error starting dnetDaemon :", err) return err } + controller.SetClusterProvider(d) + + if d.Orchestration.Agent || d.Orchestration.Manager { + d.configEvent <- struct{}{} + } createDefaultNetwork(controller) httpHandler := api.NewHTTPHandler(controller) @@ -271,6 +298,26 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error { return http.ListenAndServe(d.addr, r) } +func (d *dnetConnection) IsManager() bool { + return d.Orchestration.Manager +} + +func (d *dnetConnection) IsAgent() bool { + return d.Orchestration.Agent +} + +func (d *dnetConnection) GetListenAddress() string { + return d.Orchestration.Bind +} + +func (d *dnetConnection) GetRemoteAddress() string { + return d.Orchestration.Peer +} + +func (d *dnetConnection) ListenClusterEvents() <-chan struct{} { + return d.configEvent +} + func handleSignals(controller libnetwork.NetworkController) { c := make(chan os.Signal, 1) signals := []os.Signal{os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT} @@ -354,7 +401,7 @@ func newDnetConnection(val string) (*dnetConnection, error) { return nil, fmt.Errorf("dnet currently only supports tcp transport") } - return &dnetConnection{protoAddrParts[0], protoAddrParts[1]}, nil + return &dnetConnection{protoAddrParts[0], protoAddrParts[1], &NetworkOrchestration{}, make(chan struct{}, 10)}, nil } func (d *dnetConnection) httpCall(method, path string, data interface{}, headers map[string][]string) (io.ReadCloser, http.Header, int, error) { diff --git a/libnetwork/cmd/dnet/libnetwork.toml b/libnetwork/cmd/dnet/libnetwork.toml index 2033ae6158..d45364dc41 100755 --- a/libnetwork/cmd/dnet/libnetwork.toml +++ b/libnetwork/cmd/dnet/libnetwork.toml @@ -11,3 +11,6 @@ title = "LibNetwork Configuration file" [datastore.client] provider = "consul" Address = "localhost:8500" +[orchestration] + agent = true + peer="2.2.2.2" diff --git a/libnetwork/config/config.go b/libnetwork/config/config.go index 62d9993a90..092e331d7d 100644 --- a/libnetwork/config/config.go +++ b/libnetwork/config/config.go @@ -8,6 +8,7 @@ import ( "github.com/docker/docker/pkg/discovery" "github.com/docker/docker/pkg/tlsconfig" "github.com/docker/libkv/store" + "github.com/docker/libnetwork/cluster" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/netlabel" ) @@ -21,15 +22,13 @@ type Config struct { // DaemonCfg represents libnetwork core configuration type DaemonCfg struct { - Debug bool - IsAgent bool - DataDir string - DefaultNetwork string - DefaultDriver string - Bind string - Neighbors []string - Labels []string - DriverCfg map[string]interface{} + Debug bool + DataDir string + DefaultNetwork string + DefaultDriver string + Labels []string + DriverCfg map[string]interface{} + ClusterProvider cluster.Provider } // ClusterCfg represents cluster configuration @@ -84,27 +83,6 @@ func ParseConfigOptions(cfgOptions ...Option) *Config { // to the controller type Option func(c *Config) -// OptionBind function returns an option setter for setting a bind interface or address -func OptionBind(bind string) Option { - return func(c *Config) { - c.Daemon.Bind = bind - } -} - -// OptionAgent function returns an option setter for setting agent mode -func OptionAgent() Option { - return func(c *Config) { - c.Daemon.IsAgent = true - } -} - -// OptionNeighbors function returns an option setter for setting a list of neighbors to join. -func OptionNeighbors(neighbors []string) Option { - return func(c *Config) { - c.Daemon.Neighbors = neighbors - } -} - // OptionDefaultNetwork function returns an option setter for a default network func OptionDefaultNetwork(dn string) Option { return func(c *Config) { diff --git a/libnetwork/controller.go b/libnetwork/controller.go index cc98bc7bad..1a514ad4e1 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -54,6 +54,7 @@ import ( "github.com/docker/docker/pkg/discovery" "github.com/docker/docker/pkg/plugins" "github.com/docker/docker/pkg/stringid" + "github.com/docker/libnetwork/cluster" "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/discoverapi" @@ -110,6 +111,9 @@ type NetworkController interface { // ReloadCondfiguration updates the controller configuration ReloadConfiguration(cfgOptions ...config.Option) error + + // SetClusterProvider sets cluster provider + SetClusterProvider(provider cluster.Provider) } // NetworkWalker is a client provided function which will be used to walk the Networks. @@ -157,14 +161,6 @@ func New(cfgOptions ...config.Option) (NetworkController, error) { serviceBindings: make(map[string]*service), } - if err := c.agentInit(c.cfg.Daemon.Bind); err != nil { - return nil, err - } - - if err := c.agentJoin(c.cfg.Daemon.Neighbors); err != nil { - return nil, err - } - if err := c.initStores(); err != nil { return nil, err } @@ -210,6 +206,62 @@ func New(cfgOptions ...config.Option) (NetworkController, error) { return c, nil } +func (c *controller) SetClusterProvider(provider cluster.Provider) { + c.cfg.Daemon.ClusterProvider = provider + go c.clusterAgentInit() +} + +func isValidClusteringIP(addr string) bool { + return addr != "" && !net.ParseIP(addr).IsLoopback() && !net.ParseIP(addr).IsUnspecified() +} + +func (c *controller) clusterAgentInit() { + clusterProvider := c.cfg.Daemon.ClusterProvider + for { + select { + case <-clusterProvider.ListenClusterEvents(): + if !c.isDistributedControl() { + bindAddr, _, _ := net.SplitHostPort(clusterProvider.GetListenAddress()) + remote := clusterProvider.GetRemoteAddress() + remoteAddr, _, _ := net.SplitHostPort(remote) + + // Determine the BindAddress from RemoteAddress or through best-effort routing + if !isValidClusteringIP(bindAddr) { + if !isValidClusteringIP(remoteAddr) { + remote = "8.8.8.8:53" + } + conn, err := net.Dial("udp", remote) + if err == nil { + bindHostPort := conn.LocalAddr().String() + bindAddr, _, _ = net.SplitHostPort(bindHostPort) + conn.Close() + } + } + + if bindAddr != "" && c.agent == nil { + if err := c.agentInit(bindAddr); err != nil { + log.Errorf("Error in agentInit : %v", err) + } else { + c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool { + if capability.DataScope == datastore.GlobalScope { + c.agentDriverNotify(driver) + } + return false + }) + } + } + if remoteAddr != "" { + if err := c.agentJoin(remoteAddr); err != nil { + log.Errorf("Error in agentJoin : %v", err) + } + } + } else { + c.agentClose() + } + } + } +} + func (c *controller) makeDriverConfig(ntype string) map[string]interface{} { if c.cfg == nil { return nil @@ -249,28 +301,6 @@ func (c *controller) makeDriverConfig(ntype string) map[string]interface{} { var procReloadConfig = make(chan (bool), 1) -func (c *controller) processAgentConfig(cfg *config.Config) (bool, error) { - if c.cfg.Daemon.IsAgent == cfg.Daemon.IsAgent { - // Agent configuration not changed - return false, nil - } - - c.Lock() - c.cfg = cfg - c.Unlock() - - if err := c.agentInit(c.cfg.Daemon.Bind); err != nil { - return false, err - } - - if err := c.agentJoin(c.cfg.Daemon.Neighbors); err != nil { - c.agentClose() - return false, err - } - - return true, nil -} - func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error { procReloadConfig <- true defer func() { <-procReloadConfig }() @@ -280,15 +310,6 @@ func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error { update := false cfg := config.ParseConfigOptions(cfgOptions...) - isAgentConfig, err := c.processAgentConfig(cfg) - if err != nil { - return err - } - - if isAgentConfig { - return nil - } - for s := range c.cfg.Scopes { if _, ok := cfg.Scopes[s]; !ok { return types.ForbiddenErrorf("cannot accept new configuration because it removes an existing datastore client") @@ -454,6 +475,24 @@ func (c *controller) Config() config.Config { return *c.cfg } +func (c *controller) isManager() bool { + if c.cfg == nil || c.cfg.Daemon.ClusterProvider == nil { + return false + } + return c.cfg.Daemon.ClusterProvider.IsManager() +} + +func (c *controller) isAgent() bool { + if c.cfg == nil || c.cfg.Daemon.ClusterProvider == nil { + return false + } + return c.cfg.Daemon.ClusterProvider.IsAgent() +} + +func (c *controller) isDistributedControl() bool { + return !c.isManager() && !c.isAgent() +} + func (c *controller) RegisterDriver(networkType string, driver driverapi.Driver, capability driverapi.Capability) error { c.Lock() hd := c.discovery @@ -492,13 +531,27 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ... network.processOptions(options...) + _, cap, err := network.resolveDriver(networkType, true) + if err != nil { + return nil, err + } + + if cap.DataScope == datastore.GlobalScope && !c.isDistributedControl() && !network.dynamic { + if c.isManager() { + // For non-distributed controlled environment, globalscoped non-dynamic networks are redirected to Manager + return nil, ManagerRedirectError(name) + } + + return nil, types.ForbiddenErrorf("Cannot create a multi-host network from a worker node. Please create the network from a manager node.") + } + // Make sure we have a driver available for this network type // before we allocate anything. if _, err := network.driver(true); err != nil { return nil, err } - err := network.ipamAllocate() + err = network.ipamAllocate() if err != nil { return nil, err } diff --git a/libnetwork/endpoint.go b/libnetwork/endpoint.go index 7646fa6d5d..6d1c7985ba 100644 --- a/libnetwork/endpoint.go +++ b/libnetwork/endpoint.go @@ -446,7 +446,7 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error { }() // Watch for service records - if !n.getController().cfg.Daemon.IsAgent { + if !n.getController().isAgent() { n.getController().watchSvcRecord(ep) } @@ -776,7 +776,7 @@ func (ep *endpoint) Delete(force bool) error { }() // unwatch for service records - if !n.getController().cfg.Daemon.IsAgent { + if !n.getController().isAgent() { n.getController().unWatchSvcRecord(ep) } diff --git a/libnetwork/error.go b/libnetwork/error.go index 12fb6771f6..d1291f1db6 100644 --- a/libnetwork/error.go +++ b/libnetwork/error.go @@ -173,3 +173,13 @@ func (id InvalidContainerIDError) Error() string { // BadRequest denotes the type of this error func (id InvalidContainerIDError) BadRequest() {} + +// ManagerRedirectError is returned when the request should be redirected to Manager +type ManagerRedirectError string + +func (mr ManagerRedirectError) Error() string { + return "Redirect the request to the manager" +} + +// Maskable denotes the type of this error +func (mr ManagerRedirectError) Maskable() {} diff --git a/libnetwork/network.go b/libnetwork/network.go index 90d9ff8920..76524b5dee 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -64,6 +64,7 @@ type NetworkInfo interface { IPv6Enabled() bool Internal() bool Labels() map[string]string + Dynamic() bool } // EndpointWalker is a client provided function which will be used to walk the Endpoints. @@ -187,6 +188,7 @@ type network struct { inDelete bool ingress bool driverTables []string + dynamic bool sync.Mutex } @@ -631,6 +633,13 @@ func NetworkOptionLabels(labels map[string]string) NetworkOption { } } +// NetworkOptionDynamic function returns an option setter for dynamic option for a network +func NetworkOptionDynamic() NetworkOption { + return func(n *network) { + n.dynamic = true + } +} + // NetworkOptionDeferIPv6Alloc instructs the network to defer the IPV6 address allocation until after the endpoint has been created // It is being provided to support the specific docker daemon flags where user can deterministically assign an IPv6 address // to a container as combination of fixed-cidr-v6 + mac-address @@ -697,7 +706,7 @@ func (n *network) driver(load bool) (driverapi.Driver, error) { if cap != nil { n.scope = cap.DataScope } - if c.cfg.Daemon.IsAgent { + if c.isAgent() { // If we are running in agent mode then all networks // in libnetwork are local scope regardless of the // backing driver. @@ -1455,6 +1464,13 @@ func (n *network) Internal() bool { return n.internal } +func (n *network) Dynamic() bool { + n.Lock() + defer n.Unlock() + + return n.dynamic +} + func (n *network) IPv6Enabled() bool { n.Lock() defer n.Unlock() diff --git a/libnetwork/test/integration/dnet/helpers.bash b/libnetwork/test/integration/dnet/helpers.bash index de69eb325d..8424cae7e2 100644 --- a/libnetwork/test/integration/dnet/helpers.bash +++ b/libnetwork/test/integration/dnet/helpers.bash @@ -161,7 +161,7 @@ function start_dnet() { read discovery provider address < <(parse_discovery_str consul://${bridge_ip}:8500/custom_prefix) else if [ "$nip" != "" ]; then - neighbors="neighbors = [\"${nip}:7946\"]" + neighbors=${nip} fi discovery="" @@ -190,9 +190,10 @@ title = "LibNetwork Configuration file for ${name}" [daemon] debug = false - isagent = true +[orchestration] + agent = true bind = "eth0" - ${neighbors} + peer = "${neighbors}" EOF fi diff --git a/libnetwork/test/integration/dnet/run-integration-tests.sh b/libnetwork/test/integration/dnet/run-integration-tests.sh index e8efcce26e..43f9386d68 100755 --- a/libnetwork/test/integration/dnet/run-integration-tests.sh +++ b/libnetwork/test/integration/dnet/run-integration-tests.sh @@ -242,9 +242,9 @@ if [ -z "$SUITES" ]; then then # We can only run a limited list of suites in circleci because of the # old kernel and limited docker environment. - suites="dnet simple_consul multi_consul multi_zk multi_etcd" + suites="dnet multi_consul multi_zk multi_etcd" else - suites="dnet simple_consul multi_consul multi_zk multi_etcd bridge overlay_consul overlay_consul_host overlay_zk overlay_etcd" + suites="dnet multi_consul multi_zk multi_etcd bridge overlay_consul overlay_consul_host overlay_zk overlay_etcd" fi else suites="$SUITES"