mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Provide a way for libnetwork to make use of Agent mode functionalities
Signed-off-by: Madhu Venugopal <madhu@docker.com>
This commit is contained in:
parent
79c0292f53
commit
9054ac2b48
11 changed files with 213 additions and 94 deletions
|
@ -62,7 +62,7 @@ func resolveAddr(addrOrInterface string) (string, error) {
|
|||
}
|
||||
|
||||
func (c *controller) agentInit(bindAddrOrInterface string) error {
|
||||
if !c.cfg.Daemon.IsAgent {
|
||||
if !c.isAgent() {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -94,12 +94,12 @@ func (c *controller) agentInit(bindAddrOrInterface string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) agentJoin(remotes []string) error {
|
||||
func (c *controller) agentJoin(remote string) error {
|
||||
if c.agent == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return c.agent.networkDB.Join(remotes)
|
||||
return c.agent.networkDB.Join([]string{remote})
|
||||
}
|
||||
|
||||
func (c *controller) agentDriverNotify(d driverapi.Driver) {
|
||||
|
@ -126,6 +126,7 @@ func (c *controller) agentClose() {
|
|||
c.agent.epTblCancel()
|
||||
|
||||
c.agent.networkDB.Close()
|
||||
c.agent = nil
|
||||
}
|
||||
|
||||
func (n *network) isClusterEligible() bool {
|
||||
|
|
10
libnetwork/cluster/provider.go
Normal file
10
libnetwork/cluster/provider.go
Normal file
|
@ -0,0 +1,10 @@
|
|||
package cluster
|
||||
|
||||
// Provider provides clustering config details
|
||||
type Provider interface {
|
||||
IsManager() bool
|
||||
IsAgent() bool
|
||||
GetListenAddress() string
|
||||
GetRemoteAddress() string
|
||||
ListenClusterEvents() <-chan struct{}
|
||||
}
|
|
@ -16,6 +16,7 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/codegangsta/cli"
|
||||
"github.com/docker/docker/opts"
|
||||
"github.com/docker/docker/pkg/discovery"
|
||||
|
@ -64,13 +65,31 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
func parseConfig(cfgFile string) (*config.Config, error) {
|
||||
// ParseConfig parses the libnetwork configuration file
|
||||
func (d *dnetConnection) parseOrchestrationConfig(tomlCfgFile string) error {
|
||||
dummy := &dnetConnection{}
|
||||
|
||||
if _, err := toml.DecodeFile(tomlCfgFile, dummy); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if dummy.Orchestration != nil {
|
||||
d.Orchestration = dummy.Orchestration
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *dnetConnection) parseConfig(cfgFile string) (*config.Config, error) {
|
||||
if strings.Trim(cfgFile, " ") == "" {
|
||||
cfgFile = os.Getenv(cfgFileEnv)
|
||||
if strings.Trim(cfgFile, " ") == "" {
|
||||
cfgFile = defaultCfgFile
|
||||
}
|
||||
}
|
||||
|
||||
if err := d.parseOrchestrationConfig(cfgFile); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return config.ParseConfig(cfgFile)
|
||||
}
|
||||
|
||||
|
@ -91,15 +110,6 @@ func processConfig(cfg *config.Config) []config.Option {
|
|||
dd = cfg.Daemon.DefaultDriver
|
||||
}
|
||||
options = append(options, config.OptionDefaultDriver(dd))
|
||||
if cfg.Daemon.IsAgent {
|
||||
options = append(options, config.OptionAgent())
|
||||
}
|
||||
|
||||
if cfg.Daemon.Bind != "" {
|
||||
options = append(options, config.OptionBind(cfg.Daemon.Bind))
|
||||
}
|
||||
|
||||
options = append(options, config.OptionNeighbors(cfg.Daemon.Neighbors))
|
||||
|
||||
if cfg.Daemon.Labels != nil {
|
||||
options = append(options, config.OptionLabels(cfg.Daemon.Labels))
|
||||
|
@ -220,7 +230,17 @@ type dnetConnection struct {
|
|||
// proto holds the client protocol i.e. unix.
|
||||
proto string
|
||||
// addr holds the client address.
|
||||
addr string
|
||||
addr string
|
||||
Orchestration *NetworkOrchestration
|
||||
configEvent chan struct{}
|
||||
}
|
||||
|
||||
// NetworkOrchestration exported
|
||||
type NetworkOrchestration struct {
|
||||
Agent bool
|
||||
Manager bool
|
||||
Bind string
|
||||
Peer string
|
||||
}
|
||||
|
||||
func (d *dnetConnection) dnetDaemon(cfgFile string) error {
|
||||
|
@ -228,10 +248,12 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error {
|
|||
return fmt.Errorf("failed to start test driver: %v\n", err)
|
||||
}
|
||||
|
||||
cfg, err := parseConfig(cfgFile)
|
||||
cfg, err := d.parseConfig(cfgFile)
|
||||
var cOptions []config.Option
|
||||
if err == nil {
|
||||
cOptions = processConfig(cfg)
|
||||
} else {
|
||||
logrus.Errorf("Error parsing config %v", err)
|
||||
}
|
||||
|
||||
bridgeConfig := options.Generic{
|
||||
|
@ -248,6 +270,11 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error {
|
|||
fmt.Println("Error starting dnetDaemon :", err)
|
||||
return err
|
||||
}
|
||||
controller.SetClusterProvider(d)
|
||||
|
||||
if d.Orchestration.Agent || d.Orchestration.Manager {
|
||||
d.configEvent <- struct{}{}
|
||||
}
|
||||
|
||||
createDefaultNetwork(controller)
|
||||
httpHandler := api.NewHTTPHandler(controller)
|
||||
|
@ -271,6 +298,26 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error {
|
|||
return http.ListenAndServe(d.addr, r)
|
||||
}
|
||||
|
||||
func (d *dnetConnection) IsManager() bool {
|
||||
return d.Orchestration.Manager
|
||||
}
|
||||
|
||||
func (d *dnetConnection) IsAgent() bool {
|
||||
return d.Orchestration.Agent
|
||||
}
|
||||
|
||||
func (d *dnetConnection) GetListenAddress() string {
|
||||
return d.Orchestration.Bind
|
||||
}
|
||||
|
||||
func (d *dnetConnection) GetRemoteAddress() string {
|
||||
return d.Orchestration.Peer
|
||||
}
|
||||
|
||||
func (d *dnetConnection) ListenClusterEvents() <-chan struct{} {
|
||||
return d.configEvent
|
||||
}
|
||||
|
||||
func handleSignals(controller libnetwork.NetworkController) {
|
||||
c := make(chan os.Signal, 1)
|
||||
signals := []os.Signal{os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT}
|
||||
|
@ -354,7 +401,7 @@ func newDnetConnection(val string) (*dnetConnection, error) {
|
|||
return nil, fmt.Errorf("dnet currently only supports tcp transport")
|
||||
}
|
||||
|
||||
return &dnetConnection{protoAddrParts[0], protoAddrParts[1]}, nil
|
||||
return &dnetConnection{protoAddrParts[0], protoAddrParts[1], &NetworkOrchestration{}, make(chan struct{}, 10)}, nil
|
||||
}
|
||||
|
||||
func (d *dnetConnection) httpCall(method, path string, data interface{}, headers map[string][]string) (io.ReadCloser, http.Header, int, error) {
|
||||
|
|
|
@ -11,3 +11,6 @@ title = "LibNetwork Configuration file"
|
|||
[datastore.client]
|
||||
provider = "consul"
|
||||
Address = "localhost:8500"
|
||||
[orchestration]
|
||||
agent = true
|
||||
peer="2.2.2.2"
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/docker/docker/pkg/discovery"
|
||||
"github.com/docker/docker/pkg/tlsconfig"
|
||||
"github.com/docker/libkv/store"
|
||||
"github.com/docker/libnetwork/cluster"
|
||||
"github.com/docker/libnetwork/datastore"
|
||||
"github.com/docker/libnetwork/netlabel"
|
||||
)
|
||||
|
@ -21,15 +22,13 @@ type Config struct {
|
|||
|
||||
// DaemonCfg represents libnetwork core configuration
|
||||
type DaemonCfg struct {
|
||||
Debug bool
|
||||
IsAgent bool
|
||||
DataDir string
|
||||
DefaultNetwork string
|
||||
DefaultDriver string
|
||||
Bind string
|
||||
Neighbors []string
|
||||
Labels []string
|
||||
DriverCfg map[string]interface{}
|
||||
Debug bool
|
||||
DataDir string
|
||||
DefaultNetwork string
|
||||
DefaultDriver string
|
||||
Labels []string
|
||||
DriverCfg map[string]interface{}
|
||||
ClusterProvider cluster.Provider
|
||||
}
|
||||
|
||||
// ClusterCfg represents cluster configuration
|
||||
|
@ -84,27 +83,6 @@ func ParseConfigOptions(cfgOptions ...Option) *Config {
|
|||
// to the controller
|
||||
type Option func(c *Config)
|
||||
|
||||
// OptionBind function returns an option setter for setting a bind interface or address
|
||||
func OptionBind(bind string) Option {
|
||||
return func(c *Config) {
|
||||
c.Daemon.Bind = bind
|
||||
}
|
||||
}
|
||||
|
||||
// OptionAgent function returns an option setter for setting agent mode
|
||||
func OptionAgent() Option {
|
||||
return func(c *Config) {
|
||||
c.Daemon.IsAgent = true
|
||||
}
|
||||
}
|
||||
|
||||
// OptionNeighbors function returns an option setter for setting a list of neighbors to join.
|
||||
func OptionNeighbors(neighbors []string) Option {
|
||||
return func(c *Config) {
|
||||
c.Daemon.Neighbors = neighbors
|
||||
}
|
||||
}
|
||||
|
||||
// OptionDefaultNetwork function returns an option setter for a default network
|
||||
func OptionDefaultNetwork(dn string) Option {
|
||||
return func(c *Config) {
|
||||
|
|
|
@ -54,6 +54,7 @@ import (
|
|||
"github.com/docker/docker/pkg/discovery"
|
||||
"github.com/docker/docker/pkg/plugins"
|
||||
"github.com/docker/docker/pkg/stringid"
|
||||
"github.com/docker/libnetwork/cluster"
|
||||
"github.com/docker/libnetwork/config"
|
||||
"github.com/docker/libnetwork/datastore"
|
||||
"github.com/docker/libnetwork/discoverapi"
|
||||
|
@ -110,6 +111,9 @@ type NetworkController interface {
|
|||
|
||||
// ReloadCondfiguration updates the controller configuration
|
||||
ReloadConfiguration(cfgOptions ...config.Option) error
|
||||
|
||||
// SetClusterProvider sets cluster provider
|
||||
SetClusterProvider(provider cluster.Provider)
|
||||
}
|
||||
|
||||
// NetworkWalker is a client provided function which will be used to walk the Networks.
|
||||
|
@ -157,14 +161,6 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
|
|||
serviceBindings: make(map[string]*service),
|
||||
}
|
||||
|
||||
if err := c.agentInit(c.cfg.Daemon.Bind); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := c.agentJoin(c.cfg.Daemon.Neighbors); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := c.initStores(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -210,6 +206,62 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
|
|||
return c, nil
|
||||
}
|
||||
|
||||
func (c *controller) SetClusterProvider(provider cluster.Provider) {
|
||||
c.cfg.Daemon.ClusterProvider = provider
|
||||
go c.clusterAgentInit()
|
||||
}
|
||||
|
||||
func isValidClusteringIP(addr string) bool {
|
||||
return addr != "" && !net.ParseIP(addr).IsLoopback() && !net.ParseIP(addr).IsUnspecified()
|
||||
}
|
||||
|
||||
func (c *controller) clusterAgentInit() {
|
||||
clusterProvider := c.cfg.Daemon.ClusterProvider
|
||||
for {
|
||||
select {
|
||||
case <-clusterProvider.ListenClusterEvents():
|
||||
if !c.isDistributedControl() {
|
||||
bindAddr, _, _ := net.SplitHostPort(clusterProvider.GetListenAddress())
|
||||
remote := clusterProvider.GetRemoteAddress()
|
||||
remoteAddr, _, _ := net.SplitHostPort(remote)
|
||||
|
||||
// Determine the BindAddress from RemoteAddress or through best-effort routing
|
||||
if !isValidClusteringIP(bindAddr) {
|
||||
if !isValidClusteringIP(remoteAddr) {
|
||||
remote = "8.8.8.8:53"
|
||||
}
|
||||
conn, err := net.Dial("udp", remote)
|
||||
if err == nil {
|
||||
bindHostPort := conn.LocalAddr().String()
|
||||
bindAddr, _, _ = net.SplitHostPort(bindHostPort)
|
||||
conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
if bindAddr != "" && c.agent == nil {
|
||||
if err := c.agentInit(bindAddr); err != nil {
|
||||
log.Errorf("Error in agentInit : %v", err)
|
||||
} else {
|
||||
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
|
||||
if capability.DataScope == datastore.GlobalScope {
|
||||
c.agentDriverNotify(driver)
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
}
|
||||
if remoteAddr != "" {
|
||||
if err := c.agentJoin(remoteAddr); err != nil {
|
||||
log.Errorf("Error in agentJoin : %v", err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
c.agentClose()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *controller) makeDriverConfig(ntype string) map[string]interface{} {
|
||||
if c.cfg == nil {
|
||||
return nil
|
||||
|
@ -249,28 +301,6 @@ func (c *controller) makeDriverConfig(ntype string) map[string]interface{} {
|
|||
|
||||
var procReloadConfig = make(chan (bool), 1)
|
||||
|
||||
func (c *controller) processAgentConfig(cfg *config.Config) (bool, error) {
|
||||
if c.cfg.Daemon.IsAgent == cfg.Daemon.IsAgent {
|
||||
// Agent configuration not changed
|
||||
return false, nil
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
c.cfg = cfg
|
||||
c.Unlock()
|
||||
|
||||
if err := c.agentInit(c.cfg.Daemon.Bind); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if err := c.agentJoin(c.cfg.Daemon.Neighbors); err != nil {
|
||||
c.agentClose()
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error {
|
||||
procReloadConfig <- true
|
||||
defer func() { <-procReloadConfig }()
|
||||
|
@ -280,15 +310,6 @@ func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error {
|
|||
update := false
|
||||
cfg := config.ParseConfigOptions(cfgOptions...)
|
||||
|
||||
isAgentConfig, err := c.processAgentConfig(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if isAgentConfig {
|
||||
return nil
|
||||
}
|
||||
|
||||
for s := range c.cfg.Scopes {
|
||||
if _, ok := cfg.Scopes[s]; !ok {
|
||||
return types.ForbiddenErrorf("cannot accept new configuration because it removes an existing datastore client")
|
||||
|
@ -454,6 +475,24 @@ func (c *controller) Config() config.Config {
|
|||
return *c.cfg
|
||||
}
|
||||
|
||||
func (c *controller) isManager() bool {
|
||||
if c.cfg == nil || c.cfg.Daemon.ClusterProvider == nil {
|
||||
return false
|
||||
}
|
||||
return c.cfg.Daemon.ClusterProvider.IsManager()
|
||||
}
|
||||
|
||||
func (c *controller) isAgent() bool {
|
||||
if c.cfg == nil || c.cfg.Daemon.ClusterProvider == nil {
|
||||
return false
|
||||
}
|
||||
return c.cfg.Daemon.ClusterProvider.IsAgent()
|
||||
}
|
||||
|
||||
func (c *controller) isDistributedControl() bool {
|
||||
return !c.isManager() && !c.isAgent()
|
||||
}
|
||||
|
||||
func (c *controller) RegisterDriver(networkType string, driver driverapi.Driver, capability driverapi.Capability) error {
|
||||
c.Lock()
|
||||
hd := c.discovery
|
||||
|
@ -492,13 +531,27 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ...
|
|||
|
||||
network.processOptions(options...)
|
||||
|
||||
_, cap, err := network.resolveDriver(networkType, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cap.DataScope == datastore.GlobalScope && !c.isDistributedControl() && !network.dynamic {
|
||||
if c.isManager() {
|
||||
// For non-distributed controlled environment, globalscoped non-dynamic networks are redirected to Manager
|
||||
return nil, ManagerRedirectError(name)
|
||||
}
|
||||
|
||||
return nil, types.ForbiddenErrorf("Cannot create a multi-host network from a worker node. Please create the network from a manager node.")
|
||||
}
|
||||
|
||||
// Make sure we have a driver available for this network type
|
||||
// before we allocate anything.
|
||||
if _, err := network.driver(true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := network.ipamAllocate()
|
||||
err = network.ipamAllocate()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -446,7 +446,7 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error {
|
|||
}()
|
||||
|
||||
// Watch for service records
|
||||
if !n.getController().cfg.Daemon.IsAgent {
|
||||
if !n.getController().isAgent() {
|
||||
n.getController().watchSvcRecord(ep)
|
||||
}
|
||||
|
||||
|
@ -776,7 +776,7 @@ func (ep *endpoint) Delete(force bool) error {
|
|||
}()
|
||||
|
||||
// unwatch for service records
|
||||
if !n.getController().cfg.Daemon.IsAgent {
|
||||
if !n.getController().isAgent() {
|
||||
n.getController().unWatchSvcRecord(ep)
|
||||
}
|
||||
|
||||
|
|
|
@ -173,3 +173,13 @@ func (id InvalidContainerIDError) Error() string {
|
|||
|
||||
// BadRequest denotes the type of this error
|
||||
func (id InvalidContainerIDError) BadRequest() {}
|
||||
|
||||
// ManagerRedirectError is returned when the request should be redirected to Manager
|
||||
type ManagerRedirectError string
|
||||
|
||||
func (mr ManagerRedirectError) Error() string {
|
||||
return "Redirect the request to the manager"
|
||||
}
|
||||
|
||||
// Maskable denotes the type of this error
|
||||
func (mr ManagerRedirectError) Maskable() {}
|
||||
|
|
|
@ -64,6 +64,7 @@ type NetworkInfo interface {
|
|||
IPv6Enabled() bool
|
||||
Internal() bool
|
||||
Labels() map[string]string
|
||||
Dynamic() bool
|
||||
}
|
||||
|
||||
// EndpointWalker is a client provided function which will be used to walk the Endpoints.
|
||||
|
@ -187,6 +188,7 @@ type network struct {
|
|||
inDelete bool
|
||||
ingress bool
|
||||
driverTables []string
|
||||
dynamic bool
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
|
@ -631,6 +633,13 @@ func NetworkOptionLabels(labels map[string]string) NetworkOption {
|
|||
}
|
||||
}
|
||||
|
||||
// NetworkOptionDynamic function returns an option setter for dynamic option for a network
|
||||
func NetworkOptionDynamic() NetworkOption {
|
||||
return func(n *network) {
|
||||
n.dynamic = true
|
||||
}
|
||||
}
|
||||
|
||||
// NetworkOptionDeferIPv6Alloc instructs the network to defer the IPV6 address allocation until after the endpoint has been created
|
||||
// It is being provided to support the specific docker daemon flags where user can deterministically assign an IPv6 address
|
||||
// to a container as combination of fixed-cidr-v6 + mac-address
|
||||
|
@ -697,7 +706,7 @@ func (n *network) driver(load bool) (driverapi.Driver, error) {
|
|||
if cap != nil {
|
||||
n.scope = cap.DataScope
|
||||
}
|
||||
if c.cfg.Daemon.IsAgent {
|
||||
if c.isAgent() {
|
||||
// If we are running in agent mode then all networks
|
||||
// in libnetwork are local scope regardless of the
|
||||
// backing driver.
|
||||
|
@ -1455,6 +1464,13 @@ func (n *network) Internal() bool {
|
|||
return n.internal
|
||||
}
|
||||
|
||||
func (n *network) Dynamic() bool {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
return n.dynamic
|
||||
}
|
||||
|
||||
func (n *network) IPv6Enabled() bool {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
|
|
@ -161,7 +161,7 @@ function start_dnet() {
|
|||
read discovery provider address < <(parse_discovery_str consul://${bridge_ip}:8500/custom_prefix)
|
||||
else
|
||||
if [ "$nip" != "" ]; then
|
||||
neighbors="neighbors = [\"${nip}:7946\"]"
|
||||
neighbors=${nip}
|
||||
fi
|
||||
|
||||
discovery=""
|
||||
|
@ -190,9 +190,10 @@ title = "LibNetwork Configuration file for ${name}"
|
|||
|
||||
[daemon]
|
||||
debug = false
|
||||
isagent = true
|
||||
[orchestration]
|
||||
agent = true
|
||||
bind = "eth0"
|
||||
${neighbors}
|
||||
peer = "${neighbors}"
|
||||
EOF
|
||||
fi
|
||||
|
||||
|
|
|
@ -242,9 +242,9 @@ if [ -z "$SUITES" ]; then
|
|||
then
|
||||
# We can only run a limited list of suites in circleci because of the
|
||||
# old kernel and limited docker environment.
|
||||
suites="dnet simple_consul multi_consul multi_zk multi_etcd"
|
||||
suites="dnet multi_consul multi_zk multi_etcd"
|
||||
else
|
||||
suites="dnet simple_consul multi_consul multi_zk multi_etcd bridge overlay_consul overlay_consul_host overlay_zk overlay_etcd"
|
||||
suites="dnet multi_consul multi_zk multi_etcd bridge overlay_consul overlay_consul_host overlay_zk overlay_etcd"
|
||||
fi
|
||||
else
|
||||
suites="$SUITES"
|
||||
|
|
Loading…
Reference in a new issue