From ae8643748d8e3ff56bfd923624cb16162af19e57 Mon Sep 17 00:00:00 2001 From: Madhu Venugopal Date: Fri, 15 May 2015 15:23:59 -0700 Subject: [PATCH] Libnetwork Host Discovery using Swarm Discovery pkg Signed-off-by: Madhu Venugopal --- libnetwork/cmd/test/libnetwork.toml | 4 +- libnetwork/config/config_test.go | 4 +- libnetwork/controller.go | 46 ++++- libnetwork/datastore/datastore.go | 9 + libnetwork/hostdiscovery/hostdiscovery.go | 164 ++++++++++++++++++ .../hostdiscovery/hostdiscovery_test.go | 140 +++++++++++++++ libnetwork/hostdiscovery/libnetwork.toml | 6 + libnetwork/network.go | 3 +- 8 files changed, 367 insertions(+), 9 deletions(-) create mode 100644 libnetwork/hostdiscovery/hostdiscovery.go create mode 100644 libnetwork/hostdiscovery/hostdiscovery_test.go create mode 100644 libnetwork/hostdiscovery/libnetwork.toml diff --git a/libnetwork/cmd/test/libnetwork.toml b/libnetwork/cmd/test/libnetwork.toml index 93a2ff4756..eac50c3665 100644 --- a/libnetwork/cmd/test/libnetwork.toml +++ b/libnetwork/cmd/test/libnetwork.toml @@ -3,8 +3,8 @@ title = "LibNetwork Configuration file" [daemon] debug = false [cluster] - discovery = "token://swarm-discovery-token" - Address = "Cluster-wide reachable Host IP" + discovery = "token://ce5b9756aeab50fe8fda02624f093d1c" + Address = "1.1.1.1:90" [datastore] embedded = false [datastore.client] diff --git a/libnetwork/config/config_test.go b/libnetwork/config/config_test.go index 702c8efefa..8257270e2a 100644 --- a/libnetwork/config/config_test.go +++ b/libnetwork/config/config_test.go @@ -2,6 +2,8 @@ package config import ( "testing" + + _ "github.com/docker/libnetwork/netutils" ) func TestInvalidConfig(t *testing.T) { @@ -12,7 +14,7 @@ func TestInvalidConfig(t *testing.T) { } func TestConfig(t *testing.T) { - cfg, err := ParseConfig("libnetwork.toml") + _, err := ParseConfig("libnetwork.toml") if err != nil { t.Fatal("Error parsing a valid configuration file :", err) } diff --git a/libnetwork/controller.go b/libnetwork/controller.go index c1a5f909e6..2a38ced5a1 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -47,6 +47,8 @@ package libnetwork import ( "encoding/json" + "fmt" + "net" "os" "strings" "sync" @@ -57,6 +59,7 @@ import ( "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" + "github.com/docker/libnetwork/hostdiscovery" "github.com/docker/libnetwork/sandbox" "github.com/docker/libnetwork/types" "github.com/docker/swarm/pkg/store" @@ -123,6 +126,11 @@ func New(configFile string) (NetworkController, error) { // But it cannot fail creating the Controller log.Warnf("Failed to Initialize Datastore due to %v. Operating in non-clustered mode", err) } + if err := c.initDiscovery(); err != nil { + // Failing to initalize discovery is a bad situation to be in. + // But it cannot fail creating the Controller + log.Warnf("Failed to Initialize Discovery : %v", err) + } } else { // Missing Configuration file is not a failure scenario // But without that, datastore cannot be initialized. @@ -156,6 +164,10 @@ func (c *controller) initConfig(configFile string) error { } func (c *controller) initDataStore() error { + if c.cfg == nil { + return fmt.Errorf("datastore initialization requires a valid configuration") + } + store, err := datastore.NewDataStore(&c.cfg.Datastore) if err != nil { return err @@ -168,6 +180,21 @@ func (c *controller) initDataStore() error { return nil } +func (c *controller) initDiscovery() error { + if c.cfg == nil { + return fmt.Errorf("discovery initialization requires a valid configuration") + } + + hostDiscovery := hostdiscovery.NewHostDiscovery() + return hostDiscovery.StartDiscovery(&c.cfg.Cluster, c.hostJoinCallback, c.hostLeaveCallback) +} + +func (c *controller) hostJoinCallback(hosts []net.IP) { +} + +func (c *controller) hostLeaveCallback(hosts []net.IP) { +} + func (c *controller) ConfigureNetworkDriver(networkType string, options map[string]interface{}) error { c.Lock() d, ok := c.drivers[networkType] @@ -260,18 +287,22 @@ func (c *controller) addNetworkToStore(n *network) error { if isReservedNetwork(n.Name()) { return nil } - if c.store == nil { - return ErrInvalidDatastore + c.Lock() + cs := c.store + c.Unlock() + if cs == nil { + log.Debugf("datastore not initialized. Network %s is not added to the store", n.Name()) + return nil } - return c.store.PutObjectAtomic(n) + return cs.PutObjectAtomic(n) } func (c *controller) watchNewNetworks() { c.Lock() - store = c.store + cs := c.store c.Unlock() - store.KVStore().WatchRange(datastore.Key("network"), "", 0, func(kvi []store.KVEntry) { + cs.KVStore().WatchRange(datastore.Key(datastore.NetworkKeyPrefix), "", 0, func(kvi []store.KVEntry) { for _, kve := range kvi { var n network err := json.Unmarshal(kve.Value(), &n) @@ -286,7 +317,12 @@ func (c *controller) watchNewNetworks() { if ok && existing.dbIndex == n.dbIndex { // Skip any watch notification for a network that has not changed continue + } else if ok { + // Received an update for an existing network object + log.Debugf("Skipping network update for %s (%s)", n.name, n.id) + continue } + c.newNetworkFromStore(&n) } }) diff --git a/libnetwork/datastore/datastore.go b/libnetwork/datastore/datastore.go index db8fe3890f..3199deab02 100644 --- a/libnetwork/datastore/datastore.go +++ b/libnetwork/datastore/datastore.go @@ -24,12 +24,21 @@ type datastore struct { //KV Key Value interface used by objects to be part of the DataStore type KV interface { + // Key method lets an object to provide the Key to be used in KV Store Key() []string + // Value method lets an object to marshal its content to be stored in the KV store Value() []byte + // Index method returns the latest DB Index as seen by the object Index() uint64 + // SetIndex method allows the datastore to store the latest DB Index into the object SetIndex(uint64) } +const ( + // NetworkKeyPrefix is the prefix for network key in the kv store + NetworkKeyPrefix = "network" +) + //Key provides convenient method to create a Key func Key(key ...string) string { keychain := []string{"docker", "libnetwork"} diff --git a/libnetwork/hostdiscovery/hostdiscovery.go b/libnetwork/hostdiscovery/hostdiscovery.go new file mode 100644 index 0000000000..550cb95fda --- /dev/null +++ b/libnetwork/hostdiscovery/hostdiscovery.go @@ -0,0 +1,164 @@ +package hostdiscovery + +import ( + "errors" + "fmt" + "net" + "sync" + "time" + + log "github.com/Sirupsen/logrus" + + mapset "github.com/deckarep/golang-set" + "github.com/docker/libnetwork/config" + "github.com/docker/swarm/discovery" + // Anonymous import will be removed after we upgrade to latest swarm + _ "github.com/docker/swarm/discovery/file" + // Anonymous import will be removed after we upgrade to latest swarm + _ "github.com/docker/swarm/discovery/kv" + // Anonymous import will be removed after we upgrade to latest swarm + _ "github.com/docker/swarm/discovery/nodes" + // Anonymous import will be removed after we upgrade to latest swarm + _ "github.com/docker/swarm/discovery/token" +) + +const defaultHeartbeat = 10 + +// JoinCallback provides a callback event for new node joining the cluster +type JoinCallback func(entries []net.IP) + +// LeaveCallback provides a callback event for node leaving the cluster +type LeaveCallback func(entries []net.IP) + +// HostDiscovery primary interface +type HostDiscovery interface { + // StartDiscovery initiates the discovery process and provides appropriate callbacks + StartDiscovery(*config.ClusterCfg, JoinCallback, LeaveCallback) error + // StopDiscovery stops the discovery perocess + StopDiscovery() error + // Fetch returns a list of host IPs that are currently discovered + Fetch() ([]net.IP, error) +} + +type hostDiscovery struct { + discovery discovery.Discovery + nodes mapset.Set + stopChan chan struct{} + sync.Mutex +} + +// NewHostDiscovery function creates a host discovery object +func NewHostDiscovery() HostDiscovery { + return &hostDiscovery{nodes: mapset.NewSet(), stopChan: make(chan struct{})} +} + +func (h *hostDiscovery) StartDiscovery(cfg *config.ClusterCfg, joinCallback JoinCallback, leaveCallback LeaveCallback) error { + if cfg == nil { + return fmt.Errorf("discovery requires a valid configuration") + } + + hb := cfg.Heartbeat + if hb == 0 { + hb = defaultHeartbeat + } + d, err := discovery.New(cfg.Discovery, hb) + if err != nil { + return err + } + + if ip := net.ParseIP(cfg.Address); ip == nil { + return errors.New("Address config should be either ipv4 or ipv6 address") + } + + if err := d.Register(cfg.Address + ":0"); err != nil { + return err + } + + h.Lock() + h.discovery = d + h.Unlock() + + go d.Watch(func(entries []*discovery.Entry) { + h.processCallback(entries, joinCallback, leaveCallback) + }) + + go sustainHeartbeat(d, hb, cfg, h.stopChan) + return nil +} + +func (h *hostDiscovery) StopDiscovery() error { + h.Lock() + stopChan := h.stopChan + h.discovery = nil + h.Unlock() + + close(stopChan) + return nil +} + +func sustainHeartbeat(d discovery.Discovery, hb uint64, config *config.ClusterCfg, stopChan chan struct{}) { + for { + select { + case <-stopChan: + return + case <-time.After(time.Duration(hb) * time.Second): + if err := d.Register(config.Address + ":0"); err != nil { + log.Warn(err) + } + } + } +} + +func (h *hostDiscovery) processCallback(entries []*discovery.Entry, joinCallback JoinCallback, leaveCallback LeaveCallback) { + updated := hosts(entries) + h.Lock() + existing := h.nodes + added, removed := diff(existing, updated) + h.nodes = updated + h.Unlock() + + if len(added) > 0 { + joinCallback(added) + } + if len(removed) > 0 { + leaveCallback(removed) + } +} + +func diff(existing mapset.Set, updated mapset.Set) (added []net.IP, removed []net.IP) { + addSlice := updated.Difference(existing).ToSlice() + removeSlice := existing.Difference(updated).ToSlice() + for _, ip := range addSlice { + added = append(added, net.ParseIP(ip.(string))) + } + for _, ip := range removeSlice { + removed = append(removed, net.ParseIP(ip.(string))) + } + return +} + +func (h *hostDiscovery) Fetch() ([]net.IP, error) { + h.Lock() + hd := h.discovery + h.Unlock() + if hd == nil { + return nil, errors.New("No Active Discovery") + } + entries, err := hd.Fetch() + if err != nil { + return nil, err + } + ips := []net.IP{} + for _, entry := range entries { + ips = append(ips, net.ParseIP(entry.Host)) + } + return ips, nil +} + +func hosts(entries []*discovery.Entry) mapset.Set { + hosts := mapset.NewSet() + for _, entry := range entries { + hosts.Add(entry.Host) + } + return hosts +} diff --git a/libnetwork/hostdiscovery/hostdiscovery_test.go b/libnetwork/hostdiscovery/hostdiscovery_test.go new file mode 100644 index 0000000000..90eca9d469 --- /dev/null +++ b/libnetwork/hostdiscovery/hostdiscovery_test.go @@ -0,0 +1,140 @@ +package hostdiscovery + +import ( + "net" + "testing" + "time" + + mapset "github.com/deckarep/golang-set" + _ "github.com/docker/libnetwork/netutils" + + "github.com/docker/libnetwork/config" + "github.com/docker/swarm/discovery" +) + +func TestDiscovery(t *testing.T) { + _, err := net.Dial("tcp", "discovery-stage.hub.docker.com:80") + if err != nil { + t.Skip("Skipping Discovery test which need connectivity to discovery-stage.hub.docker.com") + } + + hd := NewHostDiscovery() + config, err := config.ParseConfig("libnetwork.toml") + if err != nil { + t.Fatal(err) + } + + err = hd.StartDiscovery(&config.Cluster, func(hosts []net.IP) {}, func(hosts []net.IP) {}) + if err != nil { + t.Fatal(err) + } + time.Sleep(time.Duration(config.Cluster.Heartbeat*2) * time.Second) + hosts, err := hd.Fetch() + if err != nil { + t.Fatal(err) + } + + found := false + for _, ip := range hosts { + if ip.Equal(net.ParseIP(config.Cluster.Address)) { + found = true + } + } + if !found { + t.Fatalf("Expecting hosts. But none discovered ") + } + err = hd.StopDiscovery() + if err != nil { + t.Fatal(err) + } +} + +func TestBadDiscovery(t *testing.T) { + _, err := net.Dial("tcp", "discovery-stage.hub.docker.com:80") + if err != nil { + t.Skip("Skipping Discovery test which need connectivity to discovery-stage.hub.docker.com") + } + + hd := NewHostDiscovery() + cfg := &config.Config{} + cfg.Cluster.Discovery = "" + err = hd.StartDiscovery(&cfg.Cluster, func(hosts []net.IP) {}, func(hosts []net.IP) {}) + if err == nil { + t.Fatal("Invalid discovery configuration must fail") + } + cfg, err = config.ParseConfig("libnetwork.toml") + if err != nil { + t.Fatal(err) + } + cfg.Cluster.Address = "invalid" + err = hd.StartDiscovery(&cfg.Cluster, func(hosts []net.IP) {}, func(hosts []net.IP) {}) + if err == nil { + t.Fatal("Invalid discovery address configuration must fail") + } +} + +func TestDiff(t *testing.T) { + existing := mapset.NewSetFromSlice([]interface{}{"1.1.1.1", "2.2.2.2"}) + addedIP := "3.3.3.3" + updated := existing.Clone() + updated.Add(addedIP) + + added, removed := diff(existing, updated) + if len(added) != 1 { + t.Fatalf("Diff failed for an Add update. Expecting 1 element, but got %d elements", len(added)) + } + if added[0].String() != addedIP { + t.Fatalf("Expecting : %v, Got : %v", addedIP, added[0]) + } + if len(removed) > 0 { + t.Fatalf("Diff failed for remove use-case. Expecting 0 element, but got %d elements", len(removed)) + } + + updated = mapset.NewSetFromSlice([]interface{}{addedIP}) + added, removed = diff(existing, updated) + if len(removed) != 2 { + t.Fatalf("Diff failed for an remove update. Expecting 2 element, but got %d elements", len(removed)) + } + if len(added) != 1 { + t.Fatalf("Diff failed for add use-case. Expecting 1 element, but got %d elements", len(added)) + } +} + +func TestAddedCallback(t *testing.T) { + hd := hostDiscovery{} + hd.nodes = mapset.NewSetFromSlice([]interface{}{"1.1.1.1"}) + update := []*discovery.Entry{&discovery.Entry{Host: "1.1.1.1", Port: "0"}, &discovery.Entry{Host: "2.2.2.2", Port: "0"}} + + added := false + removed := false + hd.processCallback(update, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true }) + if !added { + t.Fatalf("Expecting a Added callback notification. But none received") + } +} + +func TestRemovedCallback(t *testing.T) { + hd := hostDiscovery{} + hd.nodes = mapset.NewSetFromSlice([]interface{}{"1.1.1.1", "2.2.2.2"}) + update := []*discovery.Entry{&discovery.Entry{Host: "1.1.1.1", Port: "0"}} + + added := false + removed := false + hd.processCallback(update, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true }) + if !removed { + t.Fatalf("Expecting a Removed callback notification. But none received") + } +} + +func TestNoCallback(t *testing.T) { + hd := hostDiscovery{} + hd.nodes = mapset.NewSetFromSlice([]interface{}{"1.1.1.1", "2.2.2.2"}) + update := []*discovery.Entry{&discovery.Entry{Host: "1.1.1.1", Port: "0"}, &discovery.Entry{Host: "2.2.2.2", Port: "0"}} + + added := false + removed := false + hd.processCallback(update, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true }) + if added || removed { + t.Fatalf("Not expecting any callback notification. But received a callback") + } +} diff --git a/libnetwork/hostdiscovery/libnetwork.toml b/libnetwork/hostdiscovery/libnetwork.toml new file mode 100644 index 0000000000..b8c6854103 --- /dev/null +++ b/libnetwork/hostdiscovery/libnetwork.toml @@ -0,0 +1,6 @@ +title = "LibNetwork Configuration file" + +[cluster] + discovery = "token://08469efb104bce980931ed24c8eb03a2" + Address = "1.1.1.1" + Heartbeat = 3 diff --git a/libnetwork/network.go b/libnetwork/network.go index 6dc9ea2a92..7b15ffc9fa 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/docker/docker/pkg/stringid" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/netlabel" "github.com/docker/libnetwork/options" @@ -79,7 +80,7 @@ func (n *network) Type() string { } func (n *network) Key() []string { - return []string{"network", string(n.id)} + return []string{datastore.NetworkKeyPrefix, string(n.id)} } func (n *network) Value() []byte {