From 6e4a5725297cf5ee2a3384935f491a2db90689d3 Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Wed, 10 Jun 2015 14:24:19 -0700 Subject: [PATCH] Overlay driver This commit brings in the first implementation of overlay driver which makes use of vxlan tunneling protocol to create logical networks across multiple hosts. This is very much alpha code and should be used for demo and testing purposes only. Signed-off-by: Jana Radhakrishnan --- libnetwork/drivers/overlay/joinleave.go | 99 +++++++ libnetwork/drivers/overlay/ov_endpoint.go | 110 +++++++ libnetwork/drivers/overlay/ov_network.go | 325 +++++++++++++++++++++ libnetwork/drivers/overlay/ov_serf.go | 249 ++++++++++++++++ libnetwork/drivers/overlay/ov_utils.go | 80 +++++ libnetwork/drivers/overlay/overlay.go | 157 ++++++++++ libnetwork/drivers/overlay/overlay_test.go | 130 +++++++++ libnetwork/drivers/overlay/peerdb.go | 280 ++++++++++++++++++ libnetwork/drivers_linux.go | 2 + libnetwork/netlabel/labels.go | 28 ++ 10 files changed, 1460 insertions(+) create mode 100644 libnetwork/drivers/overlay/joinleave.go create mode 100644 libnetwork/drivers/overlay/ov_endpoint.go create mode 100644 libnetwork/drivers/overlay/ov_network.go create mode 100644 libnetwork/drivers/overlay/ov_serf.go create mode 100644 libnetwork/drivers/overlay/ov_utils.go create mode 100644 libnetwork/drivers/overlay/overlay.go create mode 100644 libnetwork/drivers/overlay/overlay_test.go create mode 100644 libnetwork/drivers/overlay/peerdb.go diff --git a/libnetwork/drivers/overlay/joinleave.go b/libnetwork/drivers/overlay/joinleave.go new file mode 100644 index 0000000000..474970bcf5 --- /dev/null +++ b/libnetwork/drivers/overlay/joinleave.go @@ -0,0 +1,99 @@ +package overlay + +import ( + "fmt" + + "github.com/docker/libnetwork/driverapi" + "github.com/docker/libnetwork/types" + "github.com/vishvananda/netlink" +) + +// Join method is invoked when a Sandbox is attached to an endpoint. +func (d *driver) Join(nid, eid types.UUID, sboxKey string, jinfo driverapi.JoinInfo, options map[string]interface{}) error { + if err := validateID(nid, eid); err != nil { + return err + } + + n := d.network(nid) + if n == nil { + return fmt.Errorf("could not find network with id %s", nid) + } + + ep := n.endpoint(eid) + if ep == nil { + return fmt.Errorf("could not find endpoint with id %s", eid) + } + + if err := n.joinSandbox(); err != nil { + return fmt.Errorf("network sandbox join failed: %v", + err) + } + + sbox := n.sandbox() + + name1, name2, err := createVethPair() + if err != nil { + return err + } + + if err := sbox.AddInterface(name1, "veth", + sbox.InterfaceOptions().Master("bridge1")); err != nil { + return fmt.Errorf("could not add veth pair inside the network sandbox: %v", err) + } + + veth, err := netlink.LinkByName(name2) + if err != nil { + return fmt.Errorf("could not find link by name %s: %v", name2, err) + } + + if err := netlink.LinkSetHardwareAddr(veth, ep.mac); err != nil { + return fmt.Errorf("could not set mac address to the container interface: %v", err) + } + + for _, iNames := range jinfo.InterfaceNames() { + // Make sure to set names on the correct interface ID. + if iNames.ID() == 1 { + err = iNames.SetNames(name2, "eth") + if err != nil { + return err + } + } + } + + err = jinfo.SetGateway(bridgeIP.IP) + if err != nil { + return err + } + + d.peerDbAdd(nid, eid, ep.addr.IP, ep.mac, + d.serfInstance.LocalMember().Addr, true) + d.notifyCh <- ovNotify{ + action: "join", + nid: nid, + eid: eid, + } + + return nil +} + +// Leave method is invoked when a Sandbox detaches from an endpoint. +func (d *driver) Leave(nid, eid types.UUID) error { + if err := validateID(nid, eid); err != nil { + return err + } + + n := d.network(nid) + if n == nil { + return fmt.Errorf("could not find network with id %s", nid) + } + + d.notifyCh <- ovNotify{ + action: "leave", + nid: nid, + eid: eid, + } + + n.leaveSandbox() + + return nil +} diff --git a/libnetwork/drivers/overlay/ov_endpoint.go b/libnetwork/drivers/overlay/ov_endpoint.go new file mode 100644 index 0000000000..cbcb6074aa --- /dev/null +++ b/libnetwork/drivers/overlay/ov_endpoint.go @@ -0,0 +1,110 @@ +package overlay + +import ( + "encoding/binary" + "fmt" + "net" + + "github.com/docker/libnetwork/driverapi" + "github.com/docker/libnetwork/netutils" + "github.com/docker/libnetwork/types" +) + +type endpointTable map[types.UUID]*endpoint + +type endpoint struct { + id types.UUID + mac net.HardwareAddr + addr *net.IPNet +} + +func (n *network) endpoint(eid types.UUID) *endpoint { + n.Lock() + defer n.Unlock() + + return n.endpoints[eid] +} + +func (n *network) addEndpoint(ep *endpoint) { + n.Lock() + n.endpoints[ep.id] = ep + n.Unlock() +} + +func (n *network) deleteEndpoint(eid types.UUID) { + n.Lock() + delete(n.endpoints, eid) + n.Unlock() +} + +func (d *driver) CreateEndpoint(nid, eid types.UUID, epInfo driverapi.EndpointInfo, + epOptions map[string]interface{}) error { + if err := validateID(nid, eid); err != nil { + return err + } + + n := d.network(nid) + if n == nil { + return fmt.Errorf("network id %q not found", nid) + } + + ep := &endpoint{ + id: eid, + } + + if epInfo != nil && (len(epInfo.Interfaces()) > 0) { + addr := epInfo.Interfaces()[0].Address() + ep.addr = &addr + ep.mac = epInfo.Interfaces()[0].MacAddress() + n.addEndpoint(ep) + return nil + } + + ipID, err := d.ipAllocator.GetID() + if err != nil { + return fmt.Errorf("could not allocate ip from subnet %s: %v", + bridgeSubnet.String(), err) + } + + ep.addr = &net.IPNet{ + Mask: bridgeSubnet.Mask, + } + ep.addr.IP = make([]byte, 4) + + binary.BigEndian.PutUint32(ep.addr.IP, bridgeSubnetInt+ipID) + + ep.mac = netutils.GenerateRandomMAC() + + err = epInfo.AddInterface(1, ep.mac, *ep.addr, net.IPNet{}) + if err != nil { + return fmt.Errorf("could not add interface to endpoint info: %v", err) + } + + n.addEndpoint(ep) + + return nil +} + +func (d *driver) DeleteEndpoint(nid, eid types.UUID) error { + if err := validateID(nid, eid); err != nil { + return err + } + + n := d.network(nid) + if n == nil { + return fmt.Errorf("network id %q not found", nid) + } + + ep := n.endpoint(eid) + if ep == nil { + return fmt.Errorf("endpoint id %q not found", eid) + } + + d.ipAllocator.Release(binary.BigEndian.Uint32(ep.addr.IP) - bridgeSubnetInt) + n.deleteEndpoint(eid) + return nil +} + +func (d *driver) EndpointOperInfo(nid, eid types.UUID) (map[string]interface{}, error) { + return make(map[string]interface{}, 0), nil +} diff --git a/libnetwork/drivers/overlay/ov_network.go b/libnetwork/drivers/overlay/ov_network.go new file mode 100644 index 0000000000..94caa941b8 --- /dev/null +++ b/libnetwork/drivers/overlay/ov_network.go @@ -0,0 +1,325 @@ +package overlay + +import ( + "encoding/json" + "fmt" + "net" + "sync" + "syscall" + + "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork/datastore" + "github.com/docker/libnetwork/ipallocator" + "github.com/docker/libnetwork/sandbox" + "github.com/docker/libnetwork/types" + "github.com/vishvananda/netlink" + "github.com/vishvananda/netlink/nl" +) + +type networkTable map[types.UUID]*network + +type network struct { + id types.UUID + vni uint32 + dbIndex uint64 + sbox sandbox.Sandbox + endpoints endpointTable + ipAllocator *ipallocator.IPAllocator + gw net.IP + vxlanName string + driver *driver + joinCnt int + sync.Mutex +} + +func (d *driver) CreateNetwork(id types.UUID, option map[string]interface{}) error { + if id == "" { + return fmt.Errorf("invalid network id") + } + + n := &network{ + id: id, + driver: d, + endpoints: endpointTable{}, + } + + n.gw = bridgeIP.IP + + d.addNetwork(n) + + if err := n.obtainVxlanID(); err != nil { + return err + } + + return nil +} + +func (d *driver) DeleteNetwork(nid types.UUID) error { + if nid == "" { + return fmt.Errorf("invalid network id") + } + + n := d.network(nid) + if n == nil { + return fmt.Errorf("could not find network with id %s", nid) + } + + d.deleteNetwork(nid) + + return n.releaseVxlanID() +} + +func (n *network) joinSandbox() error { + n.Lock() + if n.joinCnt != 0 { + n.joinCnt++ + n.Unlock() + return nil + } + n.joinCnt++ + n.Unlock() + + return n.initSandbox() +} + +func (n *network) leaveSandbox() { + n.Lock() + n.joinCnt-- + if n.joinCnt != 0 { + n.Unlock() + return + } + n.Unlock() + + n.destroySandbox() +} + +func (n *network) destroySandbox() { + sbox := n.sandbox() + if sbox != nil { + for _, iface := range sbox.Info().Interfaces() { + iface.Remove() + } + + if err := deleteVxlan(n.vxlanName); err != nil { + logrus.Warnf("could not cleanup sandbox properly: %v", err) + } + + sbox.Destroy() + } +} + +func (n *network) initSandbox() error { + sbox, err := sandbox.NewSandbox(sandbox.GenerateKey(string(n.id)), true) + if err != nil { + return fmt.Errorf("could not create network sandbox: %v", err) + } + + // Add a bridge inside the namespace + if err := sbox.AddInterface("bridge1", "br", + sbox.InterfaceOptions().Address(bridgeIP), + sbox.InterfaceOptions().Bridge(true)); err != nil { + return fmt.Errorf("could not create bridge inside the network sandbox: %v", err) + } + + vxlanName, err := createVxlan(n.vxlanID()) + if err != nil { + return err + } + + if err := sbox.AddInterface(vxlanName, "vxlan", + sbox.InterfaceOptions().Master("bridge1")); err != nil { + return fmt.Errorf("could not add vxlan interface inside the network sandbox: %v", + err) + } + + n.vxlanName = vxlanName + + n.setSandbox(sbox) + + n.driver.peerDbUpdateSandbox(n.id) + + var nlSock *nl.NetlinkSocket + sbox.InvokeFunc(func() { + nlSock, err = nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH) + if err != nil { + err = fmt.Errorf("failed to subscribe to neighbor group netlink messages") + } + }) + + go n.watchMiss(nlSock) + + return nil +} + +func (n *network) watchMiss(nlSock *nl.NetlinkSocket) { + for { + msgs, err := nlSock.Recieve() + if err != nil { + logrus.Errorf("Failed to receive from netlink: %v ", err) + continue + } + + for _, msg := range msgs { + if msg.Header.Type != syscall.RTM_GETNEIGH && msg.Header.Type != syscall.RTM_NEWNEIGH { + continue + } + + neigh, err := netlink.NeighDeserialize(msg.Data) + if err != nil { + logrus.Errorf("Failed to deserialize netlink ndmsg: %v", err) + continue + } + + if neigh.IP.To16() != nil { + continue + } + + if neigh.State&(netlink.NUD_STALE|netlink.NUD_INCOMPLETE) == 0 { + continue + } + + mac, vtep, err := n.driver.resolvePeer(n.id, neigh.IP) + if err != nil { + logrus.Errorf("could not resolve peer %q: %v", neigh.IP, err) + continue + } + + if err := n.driver.peerAdd(n.id, types.UUID("dummy"), neigh.IP, mac, vtep, true); err != nil { + logrus.Errorf("could not add neighbor entry for missed peer: %v", err) + } + } + } +} + +func (d *driver) addNetwork(n *network) { + d.Lock() + d.networks[n.id] = n + d.Unlock() +} + +func (d *driver) deleteNetwork(nid types.UUID) { + d.Lock() + delete(d.networks, nid) + d.Unlock() +} + +func (d *driver) network(nid types.UUID) *network { + d.Lock() + defer d.Unlock() + + return d.networks[nid] +} + +func (n *network) sandbox() sandbox.Sandbox { + n.Lock() + defer n.Unlock() + + return n.sbox +} + +func (n *network) setSandbox(sbox sandbox.Sandbox) { + n.Lock() + n.sbox = sbox + n.Unlock() +} + +func (n *network) vxlanID() uint32 { + n.Lock() + defer n.Unlock() + + return n.vni +} + +func (n *network) setVxlanID(vni uint32) { + n.Lock() + n.vni = vni + n.Unlock() +} + +func (n *network) Key() []string { + return []string{"overlay", "network", string(n.id)} +} + +func (n *network) KeyPrefix() []string { + return []string{"overlay", "network"} +} + +func (n *network) Value() []byte { + b, err := json.Marshal(n.vxlanID()) + if err != nil { + return []byte{} + } + + return b +} + +func (n *network) Index() uint64 { + return n.dbIndex +} + +func (n *network) SetIndex(index uint64) { + n.dbIndex = index +} + +func (n *network) writeToStore() error { + return n.driver.store.PutObjectAtomic(n) +} + +func (n *network) releaseVxlanID() error { + if n.driver.store == nil { + return fmt.Errorf("no datastore configured. cannot release vxlan id") + } + + if n.vxlanID() == 0 { + return nil + } + + if err := n.driver.store.DeleteObjectAtomic(n); err != nil { + if err == datastore.ErrKeyModified || err == datastore.ErrKeyNotFound { + // In both the above cases we can safely assume that the key has been removed by some other + // instance and so simply get out of here + return nil + } + + return fmt.Errorf("failed to delete network to vxlan id map: %v", err) + } + + n.driver.vxlanIdm.Release(n.vxlanID()) + n.setVxlanID(0) + return nil +} + +func (n *network) obtainVxlanID() error { + if n.driver.store == nil { + return fmt.Errorf("no datastore configured. cannot obtain vxlan id") + } + + for { + var vxlanID uint32 + if err := n.driver.store.GetObject(datastore.Key(n.Key()...), + &vxlanID); err != nil { + if err == datastore.ErrKeyNotFound { + vxlanID, err = n.driver.vxlanIdm.GetID() + if err != nil { + return fmt.Errorf("failed to allocate vxlan id: %v", err) + } + + n.setVxlanID(vxlanID) + if err := n.writeToStore(); err != nil { + n.driver.vxlanIdm.Release(n.vxlanID()) + n.setVxlanID(0) + if err == datastore.ErrKeyModified { + continue + } + return fmt.Errorf("failed to update data store with vxlan id: %v", err) + } + return nil + } + return fmt.Errorf("failed to obtain vxlan id from data store: %v", err) + } + + n.setVxlanID(vxlanID) + return nil + } +} diff --git a/libnetwork/drivers/overlay/ov_serf.go b/libnetwork/drivers/overlay/ov_serf.go new file mode 100644 index 0000000000..5f467d19fa --- /dev/null +++ b/libnetwork/drivers/overlay/ov_serf.go @@ -0,0 +1,249 @@ +package overlay + +import ( + "fmt" + "net" + "strings" + "time" + + "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork/types" + "github.com/hashicorp/serf/serf" +) + +type ovNotify struct { + action string + eid types.UUID + nid types.UUID +} + +type logWriter struct{} + +func (l *logWriter) Write(p []byte) (int, error) { + str := string(p) + + switch { + case strings.Contains(str, "[WARN]"): + logrus.Warn(str) + case strings.Contains(str, "[DEBUG]"): + logrus.Debug(str) + case strings.Contains(str, "[INFO]"): + logrus.Info(str) + case strings.Contains(str, "[ERR]"): + logrus.Error(str) + } + + return len(p), nil +} + +func getBindAddr(ifaceName string) (string, error) { + iface, err := net.InterfaceByName(ifaceName) + if err != nil { + return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err) + } + + addrs, err := iface.Addrs() + if err != nil { + return "", fmt.Errorf("failed to get interface addresses: %v", err) + } + + for _, a := range addrs { + addr, ok := a.(*net.IPNet) + if !ok { + continue + } + addrIP := addr.IP + + if addrIP.IsLinkLocalUnicast() { + continue + } + + return addrIP.String(), nil + } + + return "", fmt.Errorf("failed to get bind address") +} + +func (d *driver) serfInit() error { + var err error + + config := serf.DefaultConfig() + config.Init() + if d.ifaceName != "" { + bindAddr, err := getBindAddr(d.ifaceName) + if err != nil { + return fmt.Errorf("getBindAddr error: %v", err) + } + config.MemberlistConfig.BindAddr = bindAddr + } + + d.eventCh = make(chan serf.Event, 4) + config.EventCh = d.eventCh + config.UserCoalescePeriod = 1 * time.Second + config.UserQuiescentPeriod = 50 * time.Millisecond + + config.LogOutput = logrus.StandardLogger().Out + + s, err := serf.Create(config) + if err != nil { + return fmt.Errorf("failed to create cluster node: %v", err) + } + defer func() { + if err != nil { + s.Shutdown() + } + }() + + if d.neighIP != "" { + if _, err = s.Join([]string{d.neighIP}, false); err != nil { + return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v", + d.neighIP, err) + } + } + + d.serfInstance = s + + d.notifyCh = make(chan ovNotify) + d.exitCh = make(chan chan struct{}) + + go d.startSerfLoop(d.eventCh, d.notifyCh, d.exitCh) + return nil +} + +func (d *driver) notifyEvent(event ovNotify) { + n := d.network(event.nid) + ep := n.endpoint(event.eid) + + ePayload := fmt.Sprintf("%s %s %s", event.action, ep.addr.IP.String(), ep.mac.String()) + eName := fmt.Sprintf("jl %s %s %s", d.serfInstance.LocalMember().Addr.String(), + event.nid, event.eid) + + if err := d.serfInstance.UserEvent(eName, []byte(ePayload), true); err != nil { + fmt.Printf("Sending user event failed: %v\n", err) + } +} + +func (d *driver) processEvent(u serf.UserEvent) { + fmt.Printf("Received user event name:%s, payload:%s\n", u.Name, + string(u.Payload)) + + var dummy, action, vtepStr, nid, eid, ipStr, macStr string + if _, err := fmt.Sscan(u.Name, &dummy, &vtepStr, &nid, &eid); err != nil { + fmt.Printf("Failed to scan name string: %v\n", err) + } + + if _, err := fmt.Sscan(string(u.Payload), &action, + &ipStr, &macStr); err != nil { + fmt.Printf("Failed to scan value string: %v\n", err) + } + + fmt.Printf("Parsed data = %s/%s/%s/%s/%s\n", nid, eid, vtepStr, ipStr, macStr) + + mac, err := net.ParseMAC(macStr) + if err != nil { + fmt.Printf("Failed to parse mac: %v\n", err) + } + + if d.serfInstance.LocalMember().Addr.String() == vtepStr { + return + } + + switch action { + case "join": + if err := d.peerAdd(types.UUID(nid), types.UUID(eid), net.ParseIP(ipStr), mac, + net.ParseIP(vtepStr), true); err != nil { + fmt.Printf("Peer add failed in the driver: %v\n", err) + } + case "leave": + if err := d.peerDelete(types.UUID(nid), types.UUID(eid), net.ParseIP(ipStr), mac, + net.ParseIP(vtepStr), true); err != nil { + fmt.Printf("Peer delete failed in the driver: %v\n", err) + } + } +} + +func (d *driver) processQuery(q *serf.Query) { + fmt.Printf("Received query name:%s, payload:%s\n", q.Name, + string(q.Payload)) + + var nid, ipStr string + if _, err := fmt.Sscan(string(q.Payload), &nid, &ipStr); err != nil { + fmt.Printf("Failed to scan query payload string: %v\n", err) + } + + peerMac, vtep, err := d.peerDbSearch(types.UUID(nid), net.ParseIP(ipStr)) + if err != nil { + return + } + + q.Respond([]byte(fmt.Sprintf("%s %s", peerMac.String(), vtep.String()))) +} + +func (d *driver) resolvePeer(nid types.UUID, peerIP net.IP) (net.HardwareAddr, net.IP, error) { + qPayload := fmt.Sprintf("%s %s", string(nid), peerIP.String()) + resp, err := d.serfInstance.Query("peerlookup", []byte(qPayload), nil) + if err != nil { + return nil, nil, fmt.Errorf("resolving peer by querying the cluster failed: %v", err) + } + + respCh := resp.ResponseCh() + select { + case r := <-respCh: + var macStr, vtepStr string + if _, err := fmt.Sscan(string(r.Payload), &macStr, &vtepStr); err != nil { + return nil, nil, fmt.Errorf("bad response %q for the resolve query: %v", string(r.Payload), err) + } + + mac, err := net.ParseMAC(macStr) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse mac: %v", err) + } + + return mac, net.ParseIP(vtepStr), nil + + case <-time.After(time.Second): + return nil, nil, fmt.Errorf("timed out resolving peer by querying the cluster") + } +} + +func (d *driver) startSerfLoop(eventCh chan serf.Event, notifyCh chan ovNotify, + exitCh chan chan struct{}) { + + for { + select { + case notify, ok := <-notifyCh: + if !ok { + break + } + + d.notifyEvent(notify) + case ch, ok := <-exitCh: + if !ok { + break + } + + if err := d.serfInstance.Leave(); err != nil { + fmt.Printf("failed leaving the cluster: %v\n", err) + } + + d.serfInstance.Shutdown() + close(ch) + return + case e, ok := <-eventCh: + if !ok { + break + } + + if e.EventType() == serf.EventQuery { + d.processQuery(e.(*serf.Query)) + break + } + + u, ok := e.(serf.UserEvent) + if !ok { + break + } + d.processEvent(u) + } + } +} diff --git a/libnetwork/drivers/overlay/ov_utils.go b/libnetwork/drivers/overlay/ov_utils.go new file mode 100644 index 0000000000..2349c7e81f --- /dev/null +++ b/libnetwork/drivers/overlay/ov_utils.go @@ -0,0 +1,80 @@ +package overlay + +import ( + "fmt" + + "github.com/docker/libnetwork/netutils" + "github.com/docker/libnetwork/types" + "github.com/vishvananda/netlink" +) + +func validateID(nid, eid types.UUID) error { + if nid == "" { + return fmt.Errorf("invalid network id") + } + + if eid == "" { + return fmt.Errorf("invalid endpoint id") + } + + return nil +} + +func createVethPair() (string, string, error) { + // Generate a name for what will be the host side pipe interface + name1, err := netutils.GenerateIfaceName(vethPrefix, vethLen) + if err != nil { + return "", "", fmt.Errorf("error generating veth name1: %v", err) + } + + // Generate a name for what will be the sandbox side pipe interface + name2, err := netutils.GenerateIfaceName(vethPrefix, vethLen) + if err != nil { + return "", "", fmt.Errorf("error generating veth name2: %v", err) + } + + // Generate and add the interface pipe host <-> sandbox + veth := &netlink.Veth{ + LinkAttrs: netlink.LinkAttrs{Name: name1, TxQLen: 0}, + PeerName: name2} + if err := netlink.LinkAdd(veth); err != nil { + return "", "", fmt.Errorf("error creating veth pair: %v", err) + } + + return name1, name2, nil +} + +func createVxlan(vni uint32) (string, error) { + name, err := netutils.GenerateIfaceName("vxlan", 7) + if err != nil { + return "", fmt.Errorf("error generating vxlan name: %v", err) + } + + vxlan := &netlink.Vxlan{ + LinkAttrs: netlink.LinkAttrs{Name: name}, + VxlanId: int(vni), + Learning: true, + Proxy: true, + L3miss: true, + L2miss: true, + } + + if err := netlink.LinkAdd(vxlan); err != nil { + return "", fmt.Errorf("error creating vxlan interface: %v", err) + } + + return name, nil +} + +func deleteVxlan(name string) error { + link, err := netlink.LinkByName(name) + if err != nil { + return fmt.Errorf("failed to find vxlan interface with name %s: %v", name, err) + } + + if err := netlink.LinkDel(link); err != nil { + return fmt.Errorf("error deleting vxlan interface: %v", err) + } + + return nil +} diff --git a/libnetwork/drivers/overlay/overlay.go b/libnetwork/drivers/overlay/overlay.go new file mode 100644 index 0000000000..f911926980 --- /dev/null +++ b/libnetwork/drivers/overlay/overlay.go @@ -0,0 +1,157 @@ +package overlay + +import ( + "encoding/binary" + "fmt" + "net" + "sync" + + "github.com/docker/libnetwork/config" + "github.com/docker/libnetwork/datastore" + "github.com/docker/libnetwork/driverapi" + "github.com/docker/libnetwork/idm" + "github.com/docker/libnetwork/netlabel" + "github.com/docker/libnetwork/types" + "github.com/hashicorp/serf/serf" +) + +const ( + networkType = "overlay" + vethPrefix = "veth" + vethLen = 7 + vxlanIDStart = 256 + vxlanIDEnd = 1000 +) + +type driver struct { + eventCh chan serf.Event + notifyCh chan ovNotify + exitCh chan chan struct{} + ifaceName string + neighIP string + peerDb peerNetworkMap + serfInstance *serf.Serf + networks networkTable + store datastore.DataStore + ipAllocator *idm.Idm + vxlanIdm *idm.Idm + sync.Once + sync.Mutex +} + +var ( + bridgeSubnet, bridgeIP *net.IPNet + once sync.Once + bridgeSubnetInt uint32 +) + +func onceInit() { + var err error + _, bridgeSubnet, err = net.ParseCIDR("172.21.0.0/16") + if err != nil { + panic("could not parse cid 172.21.0.0/16") + } + + bridgeSubnetInt = binary.BigEndian.Uint32(bridgeSubnet.IP.To4()) + + ip, subnet, err := net.ParseCIDR("172.21.255.254/16") + if err != nil { + panic("could not parse cid 172.21.255.254/16") + } + + bridgeIP = &net.IPNet{ + IP: ip, + Mask: subnet.Mask, + } +} + +// Init registers a new instance of overlay driver +func Init(dc driverapi.DriverCallback) error { + once.Do(onceInit) + + c := driverapi.Capability{ + Scope: driverapi.GlobalScope, + } + + return dc.RegisterDriver(networkType, &driver{ + networks: networkTable{}, + peerDb: peerNetworkMap{ + mp: map[types.UUID]peerMap{}, + }, + }, c) +} + +// Fini cleans up the driver resources +func Fini(drv driverapi.Driver) { + d := drv.(*driver) + + if d.exitCh != nil { + waitCh := make(chan struct{}) + + d.exitCh <- waitCh + + <-waitCh + } +} + +func (d *driver) Config(option map[string]interface{}) error { + var onceDone bool + var err error + + d.Do(func() { + onceDone = true + + if ifaceName, ok := option[netlabel.OverlayBindInterface]; ok { + d.ifaceName = ifaceName.(string) + } + + if neighIP, ok := option[netlabel.OverlayNeighborIP]; ok { + d.neighIP = neighIP.(string) + } + + provider, provOk := option[netlabel.KVProvider] + provURL, urlOk := option[netlabel.KVProviderURL] + + if provOk && urlOk { + cfg := &config.DatastoreCfg{ + Client: config.DatastoreClientCfg{ + Provider: provider.(string), + Address: provURL.(string), + }, + } + d.store, err = datastore.NewDataStore(cfg) + if err != nil { + err = fmt.Errorf("failed to initialize data store: %v", err) + return + } + } + + d.vxlanIdm, err = idm.New(d.store, "vxlan-id", vxlanIDStart, vxlanIDEnd) + if err != nil { + err = fmt.Errorf("failed to initialize vxlan id manager: %v", err) + return + } + + d.ipAllocator, err = idm.New(d.store, "ipam-id", 1, 0xFFFF-2) + if err != nil { + err = fmt.Errorf("failed to initalize ipam id manager: %v", err) + return + } + + err = d.serfInit() + if err != nil { + err = fmt.Errorf("initializing serf instance failed: %v", err) + } + + }) + + if !onceDone { + return fmt.Errorf("config already applied to driver") + } + + return err +} + +func (d *driver) Type() string { + return networkType +} diff --git a/libnetwork/drivers/overlay/overlay_test.go b/libnetwork/drivers/overlay/overlay_test.go new file mode 100644 index 0000000000..cc22e02f91 --- /dev/null +++ b/libnetwork/drivers/overlay/overlay_test.go @@ -0,0 +1,130 @@ +package overlay + +import ( + "testing" + "time" + + "github.com/docker/libnetwork/driverapi" +) + +type driverTester struct { + t *testing.T + d driverapi.Driver +} + +const testNetworkType = "overlay" + +func setupDriver(t *testing.T) *driverTester { + dt := &driverTester{t: t} + if err := Init(dt); err != nil { + t.Fatal(err) + } + + opt := make(map[string]interface{}) + if err := dt.d.Config(opt); err != nil { + t.Fatal(err) + } + + return dt +} + +func cleanupDriver(t *testing.T, dt *driverTester) { + ch := make(chan struct{}) + go func() { + Fini(dt.d) + close(ch) + }() + + select { + case <-ch: + case <-time.After(10 * time.Second): + t.Fatal("test timed out because Fini() did not return on time") + } +} + +func (dt *driverTester) RegisterDriver(name string, drv driverapi.Driver, + cap driverapi.Capability) error { + if name != testNetworkType { + dt.t.Fatalf("Expected driver register name to be %q. Instead got %q", + testNetworkType, name) + } + + if _, ok := drv.(*driver); !ok { + dt.t.Fatalf("Expected driver type to be %T. Instead got %T", + &driver{}, drv) + } + + dt.d = drv.(*driver) + return nil +} + +func TestOverlayInit(t *testing.T) { + if err := Init(&driverTester{t: t}); err != nil { + t.Fatal(err) + } +} + +func TestOverlayFiniWithoutConfig(t *testing.T) { + dt := &driverTester{t: t} + if err := Init(dt); err != nil { + t.Fatal(err) + } + + cleanupDriver(t, dt) +} + +func TestOverlayNilConfig(t *testing.T) { + dt := &driverTester{t: t} + if err := Init(dt); err != nil { + t.Fatal(err) + } + + if err := dt.d.Config(nil); err != nil { + t.Fatal(err) + } + + cleanupDriver(t, dt) +} + +func TestOverlayConfig(t *testing.T) { + dt := setupDriver(t) + + time.Sleep(1 * time.Second) + + d := dt.d.(*driver) + if d.notifyCh == nil { + t.Fatal("Driver notify channel wasn't initialzed after Config method") + } + + if d.exitCh == nil { + t.Fatal("Driver serfloop exit channel wasn't initialzed after Config method") + } + + if d.serfInstance == nil { + t.Fatal("Driver serfinstance hasn't been initialized after Config method") + } + + cleanupDriver(t, dt) +} + +func TestOverlayMultipleConfig(t *testing.T) { + dt := setupDriver(t) + + if err := dt.d.Config(nil); err == nil { + t.Fatal("Expected a failure, instead succeded") + } + + cleanupDriver(t, dt) +} + +func TestOverlayType(t *testing.T) { + dt := &driverTester{t: t} + if err := Init(dt); err != nil { + t.Fatal(err) + } + + if dt.d.Type() != testNetworkType { + t.Fatalf("Expected Type() to return %q. Instead got %q", testNetworkType, + dt.d.Type()) + } +} diff --git a/libnetwork/drivers/overlay/peerdb.go b/libnetwork/drivers/overlay/peerdb.go new file mode 100644 index 0000000000..b951edac93 --- /dev/null +++ b/libnetwork/drivers/overlay/peerdb.go @@ -0,0 +1,280 @@ +package overlay + +import ( + "fmt" + "net" + "sync" + "syscall" + + "github.com/docker/libnetwork/types" +) + +type peerKey struct { + peerIP net.IP + peerMac net.HardwareAddr +} + +type peerEntry struct { + eid types.UUID + vtep net.IP + inSandbox bool + isLocal bool +} + +type peerMap struct { + mp map[string]peerEntry + sync.Mutex +} + +type peerNetworkMap struct { + mp map[types.UUID]peerMap + sync.Mutex +} + +func (pKey peerKey) String() string { + return fmt.Sprintf("%s %s", pKey.peerIP, pKey.peerMac) +} + +func (pKey *peerKey) Scan(state fmt.ScanState, verb rune) error { + ipB, err := state.Token(true, nil) + if err != nil { + return err + } + + pKey.peerIP = net.ParseIP(string(ipB)) + + macB, err := state.Token(true, nil) + if err != nil { + return err + } + + pKey.peerMac, err = net.ParseMAC(string(macB)) + if err != nil { + return err + } + + return nil +} + +var peerDbWg sync.WaitGroup + +func (d *driver) peerDbWalk(nid types.UUID, f func(*peerKey, *peerEntry) bool) error { + d.peerDb.Lock() + pMap, ok := d.peerDb.mp[nid] + if !ok { + d.peerDb.Unlock() + return nil + } + d.peerDb.Unlock() + + pMap.Lock() + for pKeyStr, pEntry := range pMap.mp { + var pKey peerKey + if _, err := fmt.Sscan(pKeyStr, &pKey); err != nil { + fmt.Printf("peer key scan failed: %v", err) + } + + if f(&pKey, &pEntry) { + pMap.Unlock() + return nil + } + } + pMap.Unlock() + + return nil +} + +func (d *driver) peerDbSearch(nid types.UUID, peerIP net.IP) (net.HardwareAddr, net.IP, error) { + var ( + peerMac net.HardwareAddr + vtep net.IP + found bool + ) + + err := d.peerDbWalk(nid, func(pKey *peerKey, pEntry *peerEntry) bool { + if pKey.peerIP.Equal(peerIP) { + peerMac = pKey.peerMac + vtep = pEntry.vtep + found = true + return found + } + + return found + }) + + if err != nil { + return nil, nil, fmt.Errorf("peerdb search for peer ip %q failed: %v", peerIP, err) + } + + if !found { + return nil, nil, fmt.Errorf("peer ip %q not found in peerdb", peerIP) + } + + return peerMac, vtep, nil +} + +func (d *driver) peerDbAdd(nid, eid types.UUID, peerIP net.IP, + peerMac net.HardwareAddr, vtep net.IP, isLocal bool) { + + peerDbWg.Wait() + + d.peerDb.Lock() + pMap, ok := d.peerDb.mp[nid] + if !ok { + d.peerDb.mp[nid] = peerMap{ + mp: make(map[string]peerEntry), + } + + pMap = d.peerDb.mp[nid] + } + d.peerDb.Unlock() + + pKey := peerKey{ + peerIP: peerIP, + peerMac: peerMac, + } + + pEntry := peerEntry{ + eid: eid, + vtep: vtep, + isLocal: isLocal, + } + + pMap.Lock() + pMap.mp[pKey.String()] = pEntry + pMap.Unlock() +} + +func (d *driver) peerDbDelete(nid, eid types.UUID, peerIP net.IP, + peerMac net.HardwareAddr, vtep net.IP) { + peerDbWg.Wait() + + d.peerDb.Lock() + pMap, ok := d.peerDb.mp[nid] + if !ok { + d.peerDb.Unlock() + return + } + d.peerDb.Unlock() + + pKey := peerKey{ + peerIP: peerIP, + peerMac: peerMac, + } + + pMap.Lock() + delete(pMap.mp, pKey.String()) + pMap.Unlock() +} + +func (d *driver) peerDbUpdateSandbox(nid types.UUID) { + d.peerDb.Lock() + pMap, ok := d.peerDb.mp[nid] + if !ok { + d.peerDb.Unlock() + return + } + d.peerDb.Unlock() + + peerDbWg.Add(1) + + var peerOps []func() + pMap.Lock() + for pKeyStr, pEntry := range pMap.mp { + var pKey peerKey + if _, err := fmt.Sscan(pKeyStr, &pKey); err != nil { + fmt.Printf("peer key scan failed: %v", err) + } + + if pEntry.isLocal { + continue + } + + op := func() { + if err := d.peerAdd(nid, pEntry.eid, pKey.peerIP, + pKey.peerMac, pEntry.vtep, + false); err != nil { + fmt.Printf("peerdbupdate in sandbox failed for ip %s and mac %s: %v", + pKey.peerIP, pKey.peerMac, err) + } + } + + peerOps = append(peerOps, op) + } + pMap.Unlock() + + for _, op := range peerOps { + op() + } + + peerDbWg.Done() +} + +func (d *driver) peerAdd(nid, eid types.UUID, peerIP net.IP, + peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error { + + if err := validateID(nid, eid); err != nil { + return err + } + + if updateDb { + d.peerDbAdd(nid, eid, peerIP, peerMac, vtep, false) + } + + n := d.network(nid) + if n == nil { + return nil + } + + sbox := n.sandbox() + if sbox == nil { + return nil + } + + // Add neighbor entry for the peer IP + if err := sbox.AddNeighbor(peerIP, peerMac, sbox.NeighborOptions().LinkName(n.vxlanName)); err != nil { + return fmt.Errorf("could not add neigbor entry into the sandbox: %v", err) + } + + // Add fdb entry to the bridge for the peer mac + if err := sbox.AddNeighbor(vtep, peerMac, sbox.NeighborOptions().LinkName(n.vxlanName), + sbox.NeighborOptions().Family(syscall.AF_BRIDGE)); err != nil { + return fmt.Errorf("could not add fdb entry into the sandbox: %v", err) + } + + return nil +} + +func (d *driver) peerDelete(nid, eid types.UUID, peerIP net.IP, + peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error { + + if err := validateID(nid, eid); err != nil { + return err + } + + if updateDb { + d.peerDbDelete(nid, eid, peerIP, peerMac, vtep) + } + + n := d.network(nid) + if n == nil { + return nil + } + + sbox := n.sandbox() + if sbox == nil { + return nil + } + + // Delete fdb entry to the bridge for the peer mac + if err := sbox.DeleteNeighbor(vtep, peerMac); err != nil { + return fmt.Errorf("could not delete fdb entry into the sandbox: %v", err) + } + + // Delete neighbor entry for the peer IP + if err := sbox.DeleteNeighbor(peerIP, peerMac); err != nil { + return fmt.Errorf("could not delete neigbor entry into the sandbox: %v", err) + } + + return nil +} diff --git a/libnetwork/drivers_linux.go b/libnetwork/drivers_linux.go index 7de28f95eb..b8294d452e 100644 --- a/libnetwork/drivers_linux.go +++ b/libnetwork/drivers_linux.go @@ -5,6 +5,7 @@ import ( "github.com/docker/libnetwork/drivers/bridge" "github.com/docker/libnetwork/drivers/host" "github.com/docker/libnetwork/drivers/null" + o "github.com/docker/libnetwork/drivers/overlay" "github.com/docker/libnetwork/drivers/remote" ) @@ -14,6 +15,7 @@ func initDrivers(dc driverapi.DriverCallback) error { host.Init, null.Init, remote.Init, + o.Init, } { if err := fn(dc); err != nil { return err diff --git a/libnetwork/netlabel/labels.go b/libnetwork/netlabel/labels.go index 799ef5a08a..42779be8a4 100644 --- a/libnetwork/netlabel/labels.go +++ b/libnetwork/netlabel/labels.go @@ -1,5 +1,7 @@ package netlabel +import "strings" + const ( // Prefix constant marks the reserved label space for libnetwork Prefix = "com.docker.network" @@ -21,4 +23,30 @@ const ( //EnableIPv6 constant represents enabling IPV6 at network level EnableIPv6 = Prefix + ".enable_ipv6" + + // KVProvider constant represents the KV provider backend + KVProvider = DriverPrefix + ".kv_provider" + + // KVProviderURL constant represents the KV provider URL + KVProviderURL = DriverPrefix + ".kv_provider_url" + + // OverlayBindInterface constant represents overlay driver bind interface + OverlayBindInterface = DriverPrefix + ".overlay.bind_interface" + + // OverlayNeighborIP constant represents overlay driver neighbor IP + OverlayNeighborIP = DriverPrefix + ".overlay.neighbor_ip" ) + +// Key extracts the key portion of the label +func Key(label string) string { + kv := strings.SplitN(label, "=", 2) + + return kv[0] +} + +// Value extracts the value portion of the label +func Value(label string) string { + kv := strings.SplitN(label, "=", 2) + + return kv[1] +}