libnetwork: remove external DS-based host discovery
Signed-off-by: Anca Iordache <anca.iordache@docker.com> Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
parent
c741ab0efa
commit
00f9b23c3a
|
@ -11,9 +11,7 @@ import (
|
||||||
"github.com/docker/docker/libnetwork/netlabel"
|
"github.com/docker/docker/libnetwork/netlabel"
|
||||||
"github.com/docker/docker/libnetwork/osl"
|
"github.com/docker/docker/libnetwork/osl"
|
||||||
"github.com/docker/docker/libnetwork/portallocator"
|
"github.com/docker/docker/libnetwork/portallocator"
|
||||||
"github.com/docker/docker/pkg/discovery"
|
|
||||||
"github.com/docker/docker/pkg/plugingetter"
|
"github.com/docker/docker/pkg/plugingetter"
|
||||||
"github.com/docker/go-connections/tlsconfig"
|
|
||||||
"github.com/docker/libkv/store"
|
"github.com/docker/libkv/store"
|
||||||
"github.com/pelletier/go-toml"
|
"github.com/pelletier/go-toml"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
@ -50,7 +48,6 @@ type DaemonCfg struct {
|
||||||
|
|
||||||
// ClusterCfg represents cluster configuration
|
// ClusterCfg represents cluster configuration
|
||||||
type ClusterCfg struct {
|
type ClusterCfg struct {
|
||||||
Watcher discovery.Watcher
|
|
||||||
Address string
|
Address string
|
||||||
Discovery string
|
Discovery string
|
||||||
Heartbeat uint64
|
Heartbeat uint64
|
||||||
|
@ -144,76 +141,6 @@ func OptionLabels(labels []string) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OptionKVProvider function returns an option setter for kvstore provider
|
|
||||||
func OptionKVProvider(provider string) Option {
|
|
||||||
return func(c *Config) {
|
|
||||||
logrus.Debugf("Option OptionKVProvider: %s", provider)
|
|
||||||
if _, ok := c.Scopes[datastore.GlobalScope]; !ok {
|
|
||||||
c.Scopes[datastore.GlobalScope] = &datastore.ScopeCfg{}
|
|
||||||
}
|
|
||||||
c.Scopes[datastore.GlobalScope].Client.Provider = strings.TrimSpace(provider)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OptionKVProviderURL function returns an option setter for kvstore url
|
|
||||||
func OptionKVProviderURL(url string) Option {
|
|
||||||
return func(c *Config) {
|
|
||||||
logrus.Debugf("Option OptionKVProviderURL: %s", url)
|
|
||||||
if _, ok := c.Scopes[datastore.GlobalScope]; !ok {
|
|
||||||
c.Scopes[datastore.GlobalScope] = &datastore.ScopeCfg{}
|
|
||||||
}
|
|
||||||
c.Scopes[datastore.GlobalScope].Client.Address = strings.TrimSpace(url)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OptionKVOpts function returns an option setter for kvstore options
|
|
||||||
func OptionKVOpts(opts map[string]string) Option {
|
|
||||||
return func(c *Config) {
|
|
||||||
if opts["kv.cacertfile"] != "" && opts["kv.certfile"] != "" && opts["kv.keyfile"] != "" {
|
|
||||||
logrus.Info("Option Initializing KV with TLS")
|
|
||||||
tlsConfig, err := tlsconfig.Client(tlsconfig.Options{
|
|
||||||
CAFile: opts["kv.cacertfile"],
|
|
||||||
CertFile: opts["kv.certfile"],
|
|
||||||
KeyFile: opts["kv.keyfile"],
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
logrus.Errorf("Unable to set up TLS: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if _, ok := c.Scopes[datastore.GlobalScope]; !ok {
|
|
||||||
c.Scopes[datastore.GlobalScope] = &datastore.ScopeCfg{}
|
|
||||||
}
|
|
||||||
if c.Scopes[datastore.GlobalScope].Client.Config == nil {
|
|
||||||
c.Scopes[datastore.GlobalScope].Client.Config = &store.Config{TLS: tlsConfig}
|
|
||||||
} else {
|
|
||||||
c.Scopes[datastore.GlobalScope].Client.Config.TLS = tlsConfig
|
|
||||||
}
|
|
||||||
// Workaround libkv/etcd bug for https
|
|
||||||
c.Scopes[datastore.GlobalScope].Client.Config.ClientTLS = &store.ClientTLSConfig{
|
|
||||||
CACertFile: opts["kv.cacertfile"],
|
|
||||||
CertFile: opts["kv.certfile"],
|
|
||||||
KeyFile: opts["kv.keyfile"],
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logrus.Info("Option Initializing KV without TLS")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OptionDiscoveryWatcher function returns an option setter for discovery watcher
|
|
||||||
func OptionDiscoveryWatcher(watcher discovery.Watcher) Option {
|
|
||||||
return func(c *Config) {
|
|
||||||
c.Cluster.Watcher = watcher
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OptionDiscoveryAddress function returns an option setter for self discovery address
|
|
||||||
func OptionDiscoveryAddress(address string) Option {
|
|
||||||
return func(c *Config) {
|
|
||||||
c.Cluster.Address = address
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OptionDataDir function returns an option setter for data folder
|
// OptionDataDir function returns an option setter for data folder
|
||||||
func OptionDataDir(dataDir string) Option {
|
func OptionDataDir(dataDir string) Option {
|
||||||
return func(c *Config) {
|
return func(c *Config) {
|
||||||
|
|
|
@ -1,11 +1,9 @@
|
||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/docker/docker/libnetwork/datastore"
|
|
||||||
"github.com/docker/docker/libnetwork/netlabel"
|
"github.com/docker/docker/libnetwork/netlabel"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -54,90 +52,3 @@ func TestValidName(t *testing.T) {
|
||||||
t.Fatal("Name validation succeeds for a case when it is expected to fail")
|
t.Fatal("Name validation succeeds for a case when it is expected to fail")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTLSConfiguration(t *testing.T) {
|
|
||||||
cert := `-----BEGIN CERTIFICATE-----
|
|
||||||
MIIDCDCCAfKgAwIBAgIICifG7YeiQOEwCwYJKoZIhvcNAQELMBIxEDAOBgNVBAMT
|
|
||||||
B1Rlc3QgQ0EwHhcNMTUxMDAxMjMwMDAwWhcNMjAwOTI5MjMwMDAwWjASMRAwDgYD
|
|
||||||
VQQDEwdUZXN0IENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1wRC
|
|
||||||
O+flnLTK5ImjTurNRHwSejuqGbc4CAvpB0hS+z0QlSs4+zE9h80aC4hz+6caRpds
|
|
||||||
+J908Q+RvAittMHbpc7VjbZP72G6fiXk7yPPl6C10HhRSoSi3nY+B7F2E8cuz14q
|
|
||||||
V2e+ejhWhSrBb/keyXpcyjoW1BOAAJ2TIclRRkICSCZrpXUyXxAvzXfpFXo1RhSb
|
|
||||||
UywN11pfiCQzDUN7sPww9UzFHuAHZHoyfTr27XnJYVUerVYrCPq8vqfn//01qz55
|
|
||||||
Xs0hvzGdlTFXhuabFtQnKFH5SNwo/fcznhB7rePOwHojxOpXTBepUCIJLbtNnWFT
|
|
||||||
V44t9gh5IqIWtoBReQIDAQABo2YwZDAOBgNVHQ8BAf8EBAMCAAYwEgYDVR0TAQH/
|
|
||||||
BAgwBgEB/wIBAjAdBgNVHQ4EFgQUZKUI8IIjIww7X/6hvwggQK4bD24wHwYDVR0j
|
|
||||||
BBgwFoAUZKUI8IIjIww7X/6hvwggQK4bD24wCwYJKoZIhvcNAQELA4IBAQDES2cz
|
|
||||||
7sCQfDCxCIWH7X8kpi/JWExzUyQEJ0rBzN1m3/x8ySRxtXyGekimBqQwQdFqlwMI
|
|
||||||
xzAQKkh3ue8tNSzRbwqMSyH14N1KrSxYS9e9szJHfUasoTpQGPmDmGIoRJuq1h6M
|
|
||||||
ej5x1SCJ7GWCR6xEXKUIE9OftXm9TdFzWa7Ja3OHz/mXteii8VXDuZ5ACq6EE5bY
|
|
||||||
8sP4gcICfJ5fTrpTlk9FIqEWWQrCGa5wk95PGEj+GJpNogjXQ97wVoo/Y3p1brEn
|
|
||||||
t5zjN9PAq4H1fuCMdNNA+p1DHNwd+ELTxcMAnb2ajwHvV6lKPXutrTFc4umJToBX
|
|
||||||
FpTxDmJHEV4bzUzh
|
|
||||||
-----END CERTIFICATE-----
|
|
||||||
`
|
|
||||||
key := `-----BEGIN RSA PRIVATE KEY-----
|
|
||||||
MIIEpQIBAAKCAQEA1wRCO+flnLTK5ImjTurNRHwSejuqGbc4CAvpB0hS+z0QlSs4
|
|
||||||
+zE9h80aC4hz+6caRpds+J908Q+RvAittMHbpc7VjbZP72G6fiXk7yPPl6C10HhR
|
|
||||||
SoSi3nY+B7F2E8cuz14qV2e+ejhWhSrBb/keyXpcyjoW1BOAAJ2TIclRRkICSCZr
|
|
||||||
pXUyXxAvzXfpFXo1RhSbUywN11pfiCQzDUN7sPww9UzFHuAHZHoyfTr27XnJYVUe
|
|
||||||
rVYrCPq8vqfn//01qz55Xs0hvzGdlTFXhuabFtQnKFH5SNwo/fcznhB7rePOwHoj
|
|
||||||
xOpXTBepUCIJLbtNnWFTV44t9gh5IqIWtoBReQIDAQABAoIBAHSWipORGp/uKFXj
|
|
||||||
i/mut776x8ofsAxhnLBARQr93ID+i49W8H7EJGkOfaDjTICYC1dbpGrri61qk8sx
|
|
||||||
qX7p3v/5NzKwOIfEpirgwVIqSNYe/ncbxnhxkx6tXtUtFKmEx40JskvSpSYAhmmO
|
|
||||||
1XSx0E/PWaEN/nLgX/f1eWJIlxlQkk3QeqL+FGbCXI48DEtlJ9+MzMu4pAwZTpj5
|
|
||||||
5qtXo5JJ0jRGfJVPAOznRsYqv864AhMdMIWguzk6EGnbaCWwPcfcn+h9a5LMdony
|
|
||||||
MDHfBS7bb5tkF3+AfnVY3IBMVx7YlsD9eAyajlgiKu4zLbwTRHjXgShy+4Oussz0
|
|
||||||
ugNGnkECgYEA/hi+McrZC8C4gg6XqK8+9joD8tnyDZDz88BQB7CZqABUSwvjDqlP
|
|
||||||
L8hcwo/lzvjBNYGkqaFPUICGWKjeCtd8pPS2DCVXxDQX4aHF1vUur0uYNncJiV3N
|
|
||||||
XQz4Iemsa6wnKf6M67b5vMXICw7dw0HZCdIHD1hnhdtDz0uVpeevLZ8CgYEA2KCT
|
|
||||||
Y43lorjrbCgMqtlefkr3GJA9dey+hTzCiWEOOqn9RqGoEGUday0sKhiLofOgmN2B
|
|
||||||
LEukpKIey8s+Q/cb6lReajDVPDsMweX8i7hz3Wa4Ugp4Xa5BpHqu8qIAE2JUZ7bU
|
|
||||||
t88aQAYE58pUF+/Lq1QzAQdrjjzQBx6SrBxieecCgYEAvukoPZEC8mmiN1VvbTX+
|
|
||||||
QFHmlZha3QaDxChB+QUe7bMRojEUL/fVnzkTOLuVFqSfxevaI/km9n0ac5KtAchV
|
|
||||||
xjp2bTnBb5EUQFqjopYktWA+xO07JRJtMfSEmjZPbbay1kKC7rdTfBm961EIHaRj
|
|
||||||
xZUf6M+rOE8964oGrdgdLlECgYEA046GQmx6fh7/82FtdZDRQp9tj3SWQUtSiQZc
|
|
||||||
qhO59Lq8mjUXz+MgBuJXxkiwXRpzlbaFB0Bca1fUoYw8o915SrDYf/Zu2OKGQ/qa
|
|
||||||
V81sgiVmDuEgycR7YOlbX6OsVUHrUlpwhY3hgfMe6UtkMvhBvHF/WhroBEIJm1pV
|
|
||||||
PXZ/CbMCgYEApNWVktFBjOaYfY6SNn4iSts1jgsQbbpglg3kT7PLKjCAhI6lNsbk
|
|
||||||
dyT7ut01PL6RaW4SeQWtrJIVQaM6vF3pprMKqlc5XihOGAmVqH7rQx9rtQB5TicL
|
|
||||||
BFrwkQE4HQtQBV60hYQUzzlSk44VFDz+jxIEtacRHaomDRh2FtOTz+I=
|
|
||||||
-----END RSA PRIVATE KEY-----
|
|
||||||
`
|
|
||||||
certFile, err := os.CreateTemp("", "cert")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to setup temp file: %s", err)
|
|
||||||
}
|
|
||||||
defer os.Remove(certFile.Name())
|
|
||||||
certFile.Write([]byte(cert))
|
|
||||||
certFile.Close()
|
|
||||||
keyFile, err := os.CreateTemp("", "key")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to setup temp file: %s", err)
|
|
||||||
}
|
|
||||||
defer os.Remove(keyFile.Name())
|
|
||||||
keyFile.Write([]byte(key))
|
|
||||||
keyFile.Close()
|
|
||||||
|
|
||||||
c := &Config{Scopes: map[string]*datastore.ScopeCfg{}}
|
|
||||||
l := map[string]string{
|
|
||||||
"kv.cacertfile": certFile.Name(),
|
|
||||||
"kv.certfile": certFile.Name(),
|
|
||||||
"kv.keyfile": keyFile.Name(),
|
|
||||||
}
|
|
||||||
f := OptionKVOpts(l)
|
|
||||||
f(c)
|
|
||||||
if _, ok := c.Scopes[datastore.GlobalScope]; !ok {
|
|
||||||
t.Fatal("GlobalScope not established")
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.Scopes[datastore.GlobalScope].Client.Config.TLS == nil {
|
|
||||||
t.Fatal("TLS is nil")
|
|
||||||
}
|
|
||||||
if c.Scopes[datastore.GlobalScope].Client.Config.TLS.RootCAs == nil {
|
|
||||||
t.Fatal("TLS.RootCAs is nil")
|
|
||||||
}
|
|
||||||
if len(c.Scopes[datastore.GlobalScope].Client.Config.TLS.Certificates) != 1 {
|
|
||||||
t.Fatal("TLS.Certificates is not length 1")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -7,6 +7,3 @@ title = "LibNetwork Configuration file"
|
||||||
Address = "Cluster-wide reachable Host IP"
|
Address = "Cluster-wide reachable Host IP"
|
||||||
[datastore]
|
[datastore]
|
||||||
embedded = false
|
embedded = false
|
||||||
[datastore.client]
|
|
||||||
provider = "consul"
|
|
||||||
Address = "localhost:8500"
|
|
||||||
|
|
|
@ -59,13 +59,11 @@ import (
|
||||||
"github.com/docker/docker/libnetwork/discoverapi"
|
"github.com/docker/docker/libnetwork/discoverapi"
|
||||||
"github.com/docker/docker/libnetwork/driverapi"
|
"github.com/docker/docker/libnetwork/driverapi"
|
||||||
"github.com/docker/docker/libnetwork/drvregistry"
|
"github.com/docker/docker/libnetwork/drvregistry"
|
||||||
"github.com/docker/docker/libnetwork/hostdiscovery"
|
|
||||||
"github.com/docker/docker/libnetwork/ipamapi"
|
"github.com/docker/docker/libnetwork/ipamapi"
|
||||||
"github.com/docker/docker/libnetwork/netlabel"
|
"github.com/docker/docker/libnetwork/netlabel"
|
||||||
"github.com/docker/docker/libnetwork/options"
|
"github.com/docker/docker/libnetwork/options"
|
||||||
"github.com/docker/docker/libnetwork/osl"
|
"github.com/docker/docker/libnetwork/osl"
|
||||||
"github.com/docker/docker/libnetwork/types"
|
"github.com/docker/docker/libnetwork/types"
|
||||||
"github.com/docker/docker/pkg/discovery"
|
|
||||||
"github.com/docker/docker/pkg/plugingetter"
|
"github.com/docker/docker/pkg/plugingetter"
|
||||||
"github.com/docker/docker/pkg/plugins"
|
"github.com/docker/docker/pkg/plugins"
|
||||||
"github.com/docker/docker/pkg/stringid"
|
"github.com/docker/docker/pkg/stringid"
|
||||||
|
@ -161,7 +159,6 @@ type controller struct {
|
||||||
sandboxes sandboxTable
|
sandboxes sandboxTable
|
||||||
cfg *config.Config
|
cfg *config.Config
|
||||||
stores []datastore.DataStore
|
stores []datastore.DataStore
|
||||||
discovery hostdiscovery.HostDiscovery
|
|
||||||
extKeyListener net.Listener
|
extKeyListener net.Listener
|
||||||
watchCh chan *endpoint
|
watchCh chan *endpoint
|
||||||
unWatchCh chan *endpoint
|
unWatchCh chan *endpoint
|
||||||
|
@ -228,14 +225,6 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
|
||||||
|
|
||||||
c.drvRegistry = drvRegistry
|
c.drvRegistry = drvRegistry
|
||||||
|
|
||||||
if c.cfg != nil && c.cfg.Cluster.Watcher != nil {
|
|
||||||
if err := c.initDiscovery(c.cfg.Cluster.Watcher); err != nil {
|
|
||||||
// Failing to initialize discovery is a bad situation to be in.
|
|
||||||
// But it cannot fail creating the Controller
|
|
||||||
logrus.Errorf("Failed to Initialize Discovery : %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
c.WalkNetworks(populateSpecial)
|
c.WalkNetworks(populateSpecial)
|
||||||
|
|
||||||
// Reserve pools first before doing cleanup. Otherwise the
|
// Reserve pools first before doing cleanup. Otherwise the
|
||||||
|
@ -518,13 +507,6 @@ func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error {
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
|
|
||||||
if c.discovery == nil && c.cfg.Cluster.Watcher != nil {
|
|
||||||
if err := c.initDiscovery(c.cfg.Cluster.Watcher); err != nil {
|
|
||||||
logrus.Errorf("Failed to Initialize Discovery after configuration update: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -564,30 +546,6 @@ func (c *controller) clusterHostID() string {
|
||||||
return addr[0]
|
return addr[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) initDiscovery(watcher discovery.Watcher) error {
|
|
||||||
if c.cfg == nil {
|
|
||||||
return fmt.Errorf("discovery initialization requires a valid configuration")
|
|
||||||
}
|
|
||||||
|
|
||||||
c.discovery = hostdiscovery.NewHostDiscovery(watcher)
|
|
||||||
return c.discovery.Watch(c.activeCallback, c.hostJoinCallback, c.hostLeaveCallback)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *controller) activeCallback() {
|
|
||||||
ds := c.getStore(datastore.GlobalScope)
|
|
||||||
if ds != nil && !ds.Active() {
|
|
||||||
ds.RestartWatch()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *controller) hostJoinCallback(nodes []net.IP) {
|
|
||||||
c.processNodeDiscovery(nodes, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *controller) hostLeaveCallback(nodes []net.IP) {
|
|
||||||
c.processNodeDiscovery(nodes, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *controller) processNodeDiscovery(nodes []net.IP, add bool) {
|
func (c *controller) processNodeDiscovery(nodes []net.IP, add bool) {
|
||||||
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
|
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
|
||||||
c.pushNodeDiscovery(driver, capability, nodes, add)
|
c.pushNodeDiscovery(driver, capability, nodes, add)
|
||||||
|
@ -662,14 +620,6 @@ func (c *controller) GetPluginGetter() plugingetter.PluginGetter {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) RegisterDriver(networkType string, driver driverapi.Driver, capability driverapi.Capability) error {
|
func (c *controller) RegisterDriver(networkType string, driver driverapi.Driver, capability driverapi.Capability) error {
|
||||||
c.Lock()
|
|
||||||
hd := c.discovery
|
|
||||||
c.Unlock()
|
|
||||||
|
|
||||||
if hd != nil {
|
|
||||||
c.pushNodeDiscovery(driver, capability, hd.Fetch(), true)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.agentDriverNotify(driver)
|
c.agentDriverNotify(driver)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,122 +0,0 @@
|
||||||
package hostdiscovery
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
mapset "github.com/deckarep/golang-set"
|
|
||||||
"github.com/docker/docker/pkg/discovery"
|
|
||||||
|
|
||||||
// Including KV
|
|
||||||
"github.com/docker/docker/libnetwork/types"
|
|
||||||
_ "github.com/docker/docker/pkg/discovery/kv" // register all the things with host discovery
|
|
||||||
"github.com/docker/libkv/store/consul"
|
|
||||||
"github.com/docker/libkv/store/etcd"
|
|
||||||
"github.com/docker/libkv/store/zookeeper"
|
|
||||||
)
|
|
||||||
|
|
||||||
type hostDiscovery struct {
|
|
||||||
watcher discovery.Watcher
|
|
||||||
nodes mapset.Set
|
|
||||||
stopChan chan struct{}
|
|
||||||
sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
consul.Register()
|
|
||||||
etcd.Register()
|
|
||||||
zookeeper.Register()
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewHostDiscovery function creates a host discovery object
|
|
||||||
func NewHostDiscovery(watcher discovery.Watcher) HostDiscovery {
|
|
||||||
return &hostDiscovery{watcher: watcher, nodes: mapset.NewSet(), stopChan: make(chan struct{})}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *hostDiscovery) Watch(activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) error {
|
|
||||||
h.Lock()
|
|
||||||
d := h.watcher
|
|
||||||
h.Unlock()
|
|
||||||
if d == nil {
|
|
||||||
return types.BadRequestErrorf("invalid discovery watcher")
|
|
||||||
}
|
|
||||||
discoveryCh, errCh := d.Watch(h.stopChan)
|
|
||||||
go h.monitorDiscovery(discoveryCh, errCh, activeCallback, joinCallback, leaveCallback)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error,
|
|
||||||
activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case entries := <-ch:
|
|
||||||
h.processCallback(entries, activeCallback, joinCallback, leaveCallback)
|
|
||||||
case err := <-errCh:
|
|
||||||
if err != nil {
|
|
||||||
logrus.Errorf("discovery error: %v", err)
|
|
||||||
}
|
|
||||||
case <-h.stopChan:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *hostDiscovery) StopDiscovery() error {
|
|
||||||
h.Lock()
|
|
||||||
stopChan := h.stopChan
|
|
||||||
h.watcher = nil
|
|
||||||
h.Unlock()
|
|
||||||
|
|
||||||
close(stopChan)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *hostDiscovery) processCallback(entries discovery.Entries,
|
|
||||||
activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
|
|
||||||
updated := hosts(entries)
|
|
||||||
h.Lock()
|
|
||||||
existing := h.nodes
|
|
||||||
added, removed := diff(existing, updated)
|
|
||||||
h.nodes = updated
|
|
||||||
h.Unlock()
|
|
||||||
|
|
||||||
activeCallback()
|
|
||||||
if len(added) > 0 {
|
|
||||||
joinCallback(added)
|
|
||||||
}
|
|
||||||
if len(removed) > 0 {
|
|
||||||
leaveCallback(removed)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func diff(existing mapset.Set, updated mapset.Set) (added []net.IP, removed []net.IP) {
|
|
||||||
addSlice := updated.Difference(existing).ToSlice()
|
|
||||||
removeSlice := existing.Difference(updated).ToSlice()
|
|
||||||
for _, ip := range addSlice {
|
|
||||||
added = append(added, net.ParseIP(ip.(string)))
|
|
||||||
}
|
|
||||||
for _, ip := range removeSlice {
|
|
||||||
removed = append(removed, net.ParseIP(ip.(string)))
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *hostDiscovery) Fetch() []net.IP {
|
|
||||||
h.Lock()
|
|
||||||
defer h.Unlock()
|
|
||||||
ips := []net.IP{}
|
|
||||||
for _, ipstr := range h.nodes.ToSlice() {
|
|
||||||
ips = append(ips, net.ParseIP(ipstr.(string)))
|
|
||||||
}
|
|
||||||
return ips
|
|
||||||
}
|
|
||||||
|
|
||||||
func hosts(entries discovery.Entries) mapset.Set {
|
|
||||||
hosts := mapset.NewSet()
|
|
||||||
for _, entry := range entries {
|
|
||||||
hosts.Add(entry.Host)
|
|
||||||
}
|
|
||||||
return hosts
|
|
||||||
}
|
|
|
@ -1,22 +0,0 @@
|
||||||
package hostdiscovery
|
|
||||||
|
|
||||||
import "net"
|
|
||||||
|
|
||||||
// JoinCallback provides a callback event for new node joining the cluster
|
|
||||||
type JoinCallback func(entries []net.IP)
|
|
||||||
|
|
||||||
// ActiveCallback provides a callback event for active discovery event
|
|
||||||
type ActiveCallback func()
|
|
||||||
|
|
||||||
// LeaveCallback provides a callback event for node leaving the cluster
|
|
||||||
type LeaveCallback func(entries []net.IP)
|
|
||||||
|
|
||||||
// HostDiscovery primary interface
|
|
||||||
type HostDiscovery interface {
|
|
||||||
//Watch Node join and leave cluster events
|
|
||||||
Watch(activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) error
|
|
||||||
// StopDiscovery stops the discovery process
|
|
||||||
StopDiscovery() error
|
|
||||||
// Fetch returns a list of host IPs that are currently discovered
|
|
||||||
Fetch() []net.IP
|
|
||||||
}
|
|
|
@ -1,82 +0,0 @@
|
||||||
package hostdiscovery
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
mapset "github.com/deckarep/golang-set"
|
|
||||||
|
|
||||||
"github.com/docker/docker/pkg/discovery"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestDiff(t *testing.T) {
|
|
||||||
existing := mapset.NewSetFromSlice([]interface{}{"1.1.1.1", "2.2.2.2"})
|
|
||||||
addedIP := "3.3.3.3"
|
|
||||||
updated := existing.Clone()
|
|
||||||
updated.Add(addedIP)
|
|
||||||
|
|
||||||
added, removed := diff(existing, updated)
|
|
||||||
if len(added) != 1 {
|
|
||||||
t.Fatalf("Diff failed for an Add update. Expecting 1 element, but got %d elements", len(added))
|
|
||||||
}
|
|
||||||
if added[0].String() != addedIP {
|
|
||||||
t.Fatalf("Expecting : %v, Got : %v", addedIP, added[0])
|
|
||||||
}
|
|
||||||
if len(removed) > 0 {
|
|
||||||
t.Fatalf("Diff failed for remove use-case. Expecting 0 element, but got %d elements", len(removed))
|
|
||||||
}
|
|
||||||
|
|
||||||
updated = mapset.NewSetFromSlice([]interface{}{addedIP})
|
|
||||||
added, removed = diff(existing, updated)
|
|
||||||
if len(removed) != 2 {
|
|
||||||
t.Fatalf("Diff failed for a remove update. Expecting 2 element, but got %d elements", len(removed))
|
|
||||||
}
|
|
||||||
if len(added) != 1 {
|
|
||||||
t.Fatalf("Diff failed for add use-case. Expecting 1 element, but got %d elements", len(added))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAddedCallback(t *testing.T) {
|
|
||||||
hd := hostDiscovery{}
|
|
||||||
hd.nodes = mapset.NewSetFromSlice([]interface{}{"1.1.1.1"})
|
|
||||||
update := []*discovery.Entry{{Host: "1.1.1.1", Port: "0"}, {Host: "2.2.2.2", Port: "0"}}
|
|
||||||
|
|
||||||
added := false
|
|
||||||
removed := false
|
|
||||||
hd.processCallback(update, func() {}, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true })
|
|
||||||
if !added {
|
|
||||||
t.Fatal("Expecting an Added callback notification. But none received")
|
|
||||||
}
|
|
||||||
if removed {
|
|
||||||
t.Fatal("Not expecting a Removed callback notification. But received a callback")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRemovedCallback(t *testing.T) {
|
|
||||||
hd := hostDiscovery{}
|
|
||||||
hd.nodes = mapset.NewSetFromSlice([]interface{}{"1.1.1.1", "2.2.2.2"})
|
|
||||||
update := []*discovery.Entry{{Host: "1.1.1.1", Port: "0"}}
|
|
||||||
|
|
||||||
added := false
|
|
||||||
removed := false
|
|
||||||
hd.processCallback(update, func() {}, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true })
|
|
||||||
if added {
|
|
||||||
t.Fatal("Not expecting an Added callback notification. But received a callback")
|
|
||||||
}
|
|
||||||
if !removed {
|
|
||||||
t.Fatal("Expecting a Removed callback notification. But none received")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNoCallback(t *testing.T) {
|
|
||||||
hd := hostDiscovery{}
|
|
||||||
hd.nodes = mapset.NewSetFromSlice([]interface{}{"1.1.1.1", "2.2.2.2"})
|
|
||||||
update := []*discovery.Entry{{Host: "1.1.1.1", Port: "0"}, {Host: "2.2.2.2", Port: "0"}}
|
|
||||||
|
|
||||||
added := false
|
|
||||||
removed := false
|
|
||||||
hd.processCallback(update, func() {}, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true })
|
|
||||||
if added || removed {
|
|
||||||
t.Fatal("Not expecting any callback notification. But received a callback")
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,6 +0,0 @@
|
||||||
title = "LibNetwork Configuration file"
|
|
||||||
|
|
||||||
[cluster]
|
|
||||||
discovery = "consul://localhost:8500"
|
|
||||||
Address = "6.5.5.5"
|
|
||||||
Heartbeat = 3
|
|
Loading…
Reference in New Issue