diff --git a/libnetwork/drivers/overlay/joinleave.go b/libnetwork/drivers/overlay/joinleave.go new file mode 100644 index 0000000000..474970bcf5 --- /dev/null +++ b/libnetwork/drivers/overlay/joinleave.go @@ -0,0 +1,99 @@ +package overlay + +import ( + "fmt" + + "github.com/docker/libnetwork/driverapi" + "github.com/docker/libnetwork/types" + "github.com/vishvananda/netlink" +) + +// Join method is invoked when a Sandbox is attached to an endpoint. +func (d *driver) Join(nid, eid types.UUID, sboxKey string, jinfo driverapi.JoinInfo, options map[string]interface{}) error { + if err := validateID(nid, eid); err != nil { + return err + } + + n := d.network(nid) + if n == nil { + return fmt.Errorf("could not find network with id %s", nid) + } + + ep := n.endpoint(eid) + if ep == nil { + return fmt.Errorf("could not find endpoint with id %s", eid) + } + + if err := n.joinSandbox(); err != nil { + return fmt.Errorf("network sandbox join failed: %v", + err) + } + + sbox := n.sandbox() + + name1, name2, err := createVethPair() + if err != nil { + return err + } + + if err := sbox.AddInterface(name1, "veth", + sbox.InterfaceOptions().Master("bridge1")); err != nil { + return fmt.Errorf("could not add veth pair inside the network sandbox: %v", err) + } + + veth, err := netlink.LinkByName(name2) + if err != nil { + return fmt.Errorf("could not find link by name %s: %v", name2, err) + } + + if err := netlink.LinkSetHardwareAddr(veth, ep.mac); err != nil { + return fmt.Errorf("could not set mac address to the container interface: %v", err) + } + + for _, iNames := range jinfo.InterfaceNames() { + // Make sure to set names on the correct interface ID. + if iNames.ID() == 1 { + err = iNames.SetNames(name2, "eth") + if err != nil { + return err + } + } + } + + err = jinfo.SetGateway(bridgeIP.IP) + if err != nil { + return err + } + + d.peerDbAdd(nid, eid, ep.addr.IP, ep.mac, + d.serfInstance.LocalMember().Addr, true) + d.notifyCh <- ovNotify{ + action: "join", + nid: nid, + eid: eid, + } + + return nil +} + +// Leave method is invoked when a Sandbox detaches from an endpoint. +func (d *driver) Leave(nid, eid types.UUID) error { + if err := validateID(nid, eid); err != nil { + return err + } + + n := d.network(nid) + if n == nil { + return fmt.Errorf("could not find network with id %s", nid) + } + + d.notifyCh <- ovNotify{ + action: "leave", + nid: nid, + eid: eid, + } + + n.leaveSandbox() + + return nil +} diff --git a/libnetwork/drivers/overlay/ov_endpoint.go b/libnetwork/drivers/overlay/ov_endpoint.go new file mode 100644 index 0000000000..cbcb6074aa --- /dev/null +++ b/libnetwork/drivers/overlay/ov_endpoint.go @@ -0,0 +1,110 @@ +package overlay + +import ( + "encoding/binary" + "fmt" + "net" + + "github.com/docker/libnetwork/driverapi" + "github.com/docker/libnetwork/netutils" + "github.com/docker/libnetwork/types" +) + +type endpointTable map[types.UUID]*endpoint + +type endpoint struct { + id types.UUID + mac net.HardwareAddr + addr *net.IPNet +} + +func (n *network) endpoint(eid types.UUID) *endpoint { + n.Lock() + defer n.Unlock() + + return n.endpoints[eid] +} + +func (n *network) addEndpoint(ep *endpoint) { + n.Lock() + n.endpoints[ep.id] = ep + n.Unlock() +} + +func (n *network) deleteEndpoint(eid types.UUID) { + n.Lock() + delete(n.endpoints, eid) + n.Unlock() +} + +func (d *driver) CreateEndpoint(nid, eid types.UUID, epInfo driverapi.EndpointInfo, + epOptions map[string]interface{}) error { + if err := validateID(nid, eid); err != nil { + return err + } + + n := d.network(nid) + if n == nil { + return fmt.Errorf("network id %q not found", nid) + } + + ep := &endpoint{ + id: eid, + } + + if epInfo != nil && (len(epInfo.Interfaces()) > 0) { + addr := epInfo.Interfaces()[0].Address() + ep.addr = &addr + ep.mac = epInfo.Interfaces()[0].MacAddress() + n.addEndpoint(ep) + return nil + } + + ipID, err := d.ipAllocator.GetID() + if err != nil { + return fmt.Errorf("could not allocate ip from subnet %s: %v", + bridgeSubnet.String(), err) + } + + ep.addr = &net.IPNet{ + Mask: bridgeSubnet.Mask, + } + ep.addr.IP = make([]byte, 4) + + binary.BigEndian.PutUint32(ep.addr.IP, bridgeSubnetInt+ipID) + + ep.mac = netutils.GenerateRandomMAC() + + err = epInfo.AddInterface(1, ep.mac, *ep.addr, net.IPNet{}) + if err != nil { + return fmt.Errorf("could not add interface to endpoint info: %v", err) + } + + n.addEndpoint(ep) + + return nil +} + +func (d *driver) DeleteEndpoint(nid, eid types.UUID) error { + if err := validateID(nid, eid); err != nil { + return err + } + + n := d.network(nid) + if n == nil { + return fmt.Errorf("network id %q not found", nid) + } + + ep := n.endpoint(eid) + if ep == nil { + return fmt.Errorf("endpoint id %q not found", eid) + } + + d.ipAllocator.Release(binary.BigEndian.Uint32(ep.addr.IP) - bridgeSubnetInt) + n.deleteEndpoint(eid) + return nil +} + +func (d *driver) EndpointOperInfo(nid, eid types.UUID) (map[string]interface{}, error) { + return make(map[string]interface{}, 0), nil +} diff --git a/libnetwork/drivers/overlay/ov_network.go b/libnetwork/drivers/overlay/ov_network.go new file mode 100644 index 0000000000..94caa941b8 --- /dev/null +++ b/libnetwork/drivers/overlay/ov_network.go @@ -0,0 +1,325 @@ +package overlay + +import ( + "encoding/json" + "fmt" + "net" + "sync" + "syscall" + + "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork/datastore" + "github.com/docker/libnetwork/ipallocator" + "github.com/docker/libnetwork/sandbox" + "github.com/docker/libnetwork/types" + "github.com/vishvananda/netlink" + "github.com/vishvananda/netlink/nl" +) + +type networkTable map[types.UUID]*network + +type network struct { + id types.UUID + vni uint32 + dbIndex uint64 + sbox sandbox.Sandbox + endpoints endpointTable + ipAllocator *ipallocator.IPAllocator + gw net.IP + vxlanName string + driver *driver + joinCnt int + sync.Mutex +} + +func (d *driver) CreateNetwork(id types.UUID, option map[string]interface{}) error { + if id == "" { + return fmt.Errorf("invalid network id") + } + + n := &network{ + id: id, + driver: d, + endpoints: endpointTable{}, + } + + n.gw = bridgeIP.IP + + d.addNetwork(n) + + if err := n.obtainVxlanID(); err != nil { + return err + } + + return nil +} + +func (d *driver) DeleteNetwork(nid types.UUID) error { + if nid == "" { + return fmt.Errorf("invalid network id") + } + + n := d.network(nid) + if n == nil { + return fmt.Errorf("could not find network with id %s", nid) + } + + d.deleteNetwork(nid) + + return n.releaseVxlanID() +} + +func (n *network) joinSandbox() error { + n.Lock() + if n.joinCnt != 0 { + n.joinCnt++ + n.Unlock() + return nil + } + n.joinCnt++ + n.Unlock() + + return n.initSandbox() +} + +func (n *network) leaveSandbox() { + n.Lock() + n.joinCnt-- + if n.joinCnt != 0 { + n.Unlock() + return + } + n.Unlock() + + n.destroySandbox() +} + +func (n *network) destroySandbox() { + sbox := n.sandbox() + if sbox != nil { + for _, iface := range sbox.Info().Interfaces() { + iface.Remove() + } + + if err := deleteVxlan(n.vxlanName); err != nil { + logrus.Warnf("could not cleanup sandbox properly: %v", err) + } + + sbox.Destroy() + } +} + +func (n *network) initSandbox() error { + sbox, err := sandbox.NewSandbox(sandbox.GenerateKey(string(n.id)), true) + if err != nil { + return fmt.Errorf("could not create network sandbox: %v", err) + } + + // Add a bridge inside the namespace + if err := sbox.AddInterface("bridge1", "br", + sbox.InterfaceOptions().Address(bridgeIP), + sbox.InterfaceOptions().Bridge(true)); err != nil { + return fmt.Errorf("could not create bridge inside the network sandbox: %v", err) + } + + vxlanName, err := createVxlan(n.vxlanID()) + if err != nil { + return err + } + + if err := sbox.AddInterface(vxlanName, "vxlan", + sbox.InterfaceOptions().Master("bridge1")); err != nil { + return fmt.Errorf("could not add vxlan interface inside the network sandbox: %v", + err) + } + + n.vxlanName = vxlanName + + n.setSandbox(sbox) + + n.driver.peerDbUpdateSandbox(n.id) + + var nlSock *nl.NetlinkSocket + sbox.InvokeFunc(func() { + nlSock, err = nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH) + if err != nil { + err = fmt.Errorf("failed to subscribe to neighbor group netlink messages") + } + }) + + go n.watchMiss(nlSock) + + return nil +} + +func (n *network) watchMiss(nlSock *nl.NetlinkSocket) { + for { + msgs, err := nlSock.Recieve() + if err != nil { + logrus.Errorf("Failed to receive from netlink: %v ", err) + continue + } + + for _, msg := range msgs { + if msg.Header.Type != syscall.RTM_GETNEIGH && msg.Header.Type != syscall.RTM_NEWNEIGH { + continue + } + + neigh, err := netlink.NeighDeserialize(msg.Data) + if err != nil { + logrus.Errorf("Failed to deserialize netlink ndmsg: %v", err) + continue + } + + if neigh.IP.To16() != nil { + continue + } + + if neigh.State&(netlink.NUD_STALE|netlink.NUD_INCOMPLETE) == 0 { + continue + } + + mac, vtep, err := n.driver.resolvePeer(n.id, neigh.IP) + if err != nil { + logrus.Errorf("could not resolve peer %q: %v", neigh.IP, err) + continue + } + + if err := n.driver.peerAdd(n.id, types.UUID("dummy"), neigh.IP, mac, vtep, true); err != nil { + logrus.Errorf("could not add neighbor entry for missed peer: %v", err) + } + } + } +} + +func (d *driver) addNetwork(n *network) { + d.Lock() + d.networks[n.id] = n + d.Unlock() +} + +func (d *driver) deleteNetwork(nid types.UUID) { + d.Lock() + delete(d.networks, nid) + d.Unlock() +} + +func (d *driver) network(nid types.UUID) *network { + d.Lock() + defer d.Unlock() + + return d.networks[nid] +} + +func (n *network) sandbox() sandbox.Sandbox { + n.Lock() + defer n.Unlock() + + return n.sbox +} + +func (n *network) setSandbox(sbox sandbox.Sandbox) { + n.Lock() + n.sbox = sbox + n.Unlock() +} + +func (n *network) vxlanID() uint32 { + n.Lock() + defer n.Unlock() + + return n.vni +} + +func (n *network) setVxlanID(vni uint32) { + n.Lock() + n.vni = vni + n.Unlock() +} + +func (n *network) Key() []string { + return []string{"overlay", "network", string(n.id)} +} + +func (n *network) KeyPrefix() []string { + return []string{"overlay", "network"} +} + +func (n *network) Value() []byte { + b, err := json.Marshal(n.vxlanID()) + if err != nil { + return []byte{} + } + + return b +} + +func (n *network) Index() uint64 { + return n.dbIndex +} + +func (n *network) SetIndex(index uint64) { + n.dbIndex = index +} + +func (n *network) writeToStore() error { + return n.driver.store.PutObjectAtomic(n) +} + +func (n *network) releaseVxlanID() error { + if n.driver.store == nil { + return fmt.Errorf("no datastore configured. cannot release vxlan id") + } + + if n.vxlanID() == 0 { + return nil + } + + if err := n.driver.store.DeleteObjectAtomic(n); err != nil { + if err == datastore.ErrKeyModified || err == datastore.ErrKeyNotFound { + // In both the above cases we can safely assume that the key has been removed by some other + // instance and so simply get out of here + return nil + } + + return fmt.Errorf("failed to delete network to vxlan id map: %v", err) + } + + n.driver.vxlanIdm.Release(n.vxlanID()) + n.setVxlanID(0) + return nil +} + +func (n *network) obtainVxlanID() error { + if n.driver.store == nil { + return fmt.Errorf("no datastore configured. cannot obtain vxlan id") + } + + for { + var vxlanID uint32 + if err := n.driver.store.GetObject(datastore.Key(n.Key()...), + &vxlanID); err != nil { + if err == datastore.ErrKeyNotFound { + vxlanID, err = n.driver.vxlanIdm.GetID() + if err != nil { + return fmt.Errorf("failed to allocate vxlan id: %v", err) + } + + n.setVxlanID(vxlanID) + if err := n.writeToStore(); err != nil { + n.driver.vxlanIdm.Release(n.vxlanID()) + n.setVxlanID(0) + if err == datastore.ErrKeyModified { + continue + } + return fmt.Errorf("failed to update data store with vxlan id: %v", err) + } + return nil + } + return fmt.Errorf("failed to obtain vxlan id from data store: %v", err) + } + + n.setVxlanID(vxlanID) + return nil + } +} diff --git a/libnetwork/drivers/overlay/ov_serf.go b/libnetwork/drivers/overlay/ov_serf.go new file mode 100644 index 0000000000..5f467d19fa --- /dev/null +++ b/libnetwork/drivers/overlay/ov_serf.go @@ -0,0 +1,249 @@ +package overlay + +import ( + "fmt" + "net" + "strings" + "time" + + "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork/types" + "github.com/hashicorp/serf/serf" +) + +type ovNotify struct { + action string + eid types.UUID + nid types.UUID +} + +type logWriter struct{} + +func (l *logWriter) Write(p []byte) (int, error) { + str := string(p) + + switch { + case strings.Contains(str, "[WARN]"): + logrus.Warn(str) + case strings.Contains(str, "[DEBUG]"): + logrus.Debug(str) + case strings.Contains(str, "[INFO]"): + logrus.Info(str) + case strings.Contains(str, "[ERR]"): + logrus.Error(str) + } + + return len(p), nil +} + +func getBindAddr(ifaceName string) (string, error) { + iface, err := net.InterfaceByName(ifaceName) + if err != nil { + return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err) + } + + addrs, err := iface.Addrs() + if err != nil { + return "", fmt.Errorf("failed to get interface addresses: %v", err) + } + + for _, a := range addrs { + addr, ok := a.(*net.IPNet) + if !ok { + continue + } + addrIP := addr.IP + + if addrIP.IsLinkLocalUnicast() { + continue + } + + return addrIP.String(), nil + } + + return "", fmt.Errorf("failed to get bind address") +} + +func (d *driver) serfInit() error { + var err error + + config := serf.DefaultConfig() + config.Init() + if d.ifaceName != "" { + bindAddr, err := getBindAddr(d.ifaceName) + if err != nil { + return fmt.Errorf("getBindAddr error: %v", err) + } + config.MemberlistConfig.BindAddr = bindAddr + } + + d.eventCh = make(chan serf.Event, 4) + config.EventCh = d.eventCh + config.UserCoalescePeriod = 1 * time.Second + config.UserQuiescentPeriod = 50 * time.Millisecond + + config.LogOutput = logrus.StandardLogger().Out + + s, err := serf.Create(config) + if err != nil { + return fmt.Errorf("failed to create cluster node: %v", err) + } + defer func() { + if err != nil { + s.Shutdown() + } + }() + + if d.neighIP != "" { + if _, err = s.Join([]string{d.neighIP}, false); err != nil { + return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v", + d.neighIP, err) + } + } + + d.serfInstance = s + + d.notifyCh = make(chan ovNotify) + d.exitCh = make(chan chan struct{}) + + go d.startSerfLoop(d.eventCh, d.notifyCh, d.exitCh) + return nil +} + +func (d *driver) notifyEvent(event ovNotify) { + n := d.network(event.nid) + ep := n.endpoint(event.eid) + + ePayload := fmt.Sprintf("%s %s %s", event.action, ep.addr.IP.String(), ep.mac.String()) + eName := fmt.Sprintf("jl %s %s %s", d.serfInstance.LocalMember().Addr.String(), + event.nid, event.eid) + + if err := d.serfInstance.UserEvent(eName, []byte(ePayload), true); err != nil { + fmt.Printf("Sending user event failed: %v\n", err) + } +} + +func (d *driver) processEvent(u serf.UserEvent) { + fmt.Printf("Received user event name:%s, payload:%s\n", u.Name, + string(u.Payload)) + + var dummy, action, vtepStr, nid, eid, ipStr, macStr string + if _, err := fmt.Sscan(u.Name, &dummy, &vtepStr, &nid, &eid); err != nil { + fmt.Printf("Failed to scan name string: %v\n", err) + } + + if _, err := fmt.Sscan(string(u.Payload), &action, + &ipStr, &macStr); err != nil { + fmt.Printf("Failed to scan value string: %v\n", err) + } + + fmt.Printf("Parsed data = %s/%s/%s/%s/%s\n", nid, eid, vtepStr, ipStr, macStr) + + mac, err := net.ParseMAC(macStr) + if err != nil { + fmt.Printf("Failed to parse mac: %v\n", err) + } + + if d.serfInstance.LocalMember().Addr.String() == vtepStr { + return + } + + switch action { + case "join": + if err := d.peerAdd(types.UUID(nid), types.UUID(eid), net.ParseIP(ipStr), mac, + net.ParseIP(vtepStr), true); err != nil { + fmt.Printf("Peer add failed in the driver: %v\n", err) + } + case "leave": + if err := d.peerDelete(types.UUID(nid), types.UUID(eid), net.ParseIP(ipStr), mac, + net.ParseIP(vtepStr), true); err != nil { + fmt.Printf("Peer delete failed in the driver: %v\n", err) + } + } +} + +func (d *driver) processQuery(q *serf.Query) { + fmt.Printf("Received query name:%s, payload:%s\n", q.Name, + string(q.Payload)) + + var nid, ipStr string + if _, err := fmt.Sscan(string(q.Payload), &nid, &ipStr); err != nil { + fmt.Printf("Failed to scan query payload string: %v\n", err) + } + + peerMac, vtep, err := d.peerDbSearch(types.UUID(nid), net.ParseIP(ipStr)) + if err != nil { + return + } + + q.Respond([]byte(fmt.Sprintf("%s %s", peerMac.String(), vtep.String()))) +} + +func (d *driver) resolvePeer(nid types.UUID, peerIP net.IP) (net.HardwareAddr, net.IP, error) { + qPayload := fmt.Sprintf("%s %s", string(nid), peerIP.String()) + resp, err := d.serfInstance.Query("peerlookup", []byte(qPayload), nil) + if err != nil { + return nil, nil, fmt.Errorf("resolving peer by querying the cluster failed: %v", err) + } + + respCh := resp.ResponseCh() + select { + case r := <-respCh: + var macStr, vtepStr string + if _, err := fmt.Sscan(string(r.Payload), &macStr, &vtepStr); err != nil { + return nil, nil, fmt.Errorf("bad response %q for the resolve query: %v", string(r.Payload), err) + } + + mac, err := net.ParseMAC(macStr) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse mac: %v", err) + } + + return mac, net.ParseIP(vtepStr), nil + + case <-time.After(time.Second): + return nil, nil, fmt.Errorf("timed out resolving peer by querying the cluster") + } +} + +func (d *driver) startSerfLoop(eventCh chan serf.Event, notifyCh chan ovNotify, + exitCh chan chan struct{}) { + + for { + select { + case notify, ok := <-notifyCh: + if !ok { + break + } + + d.notifyEvent(notify) + case ch, ok := <-exitCh: + if !ok { + break + } + + if err := d.serfInstance.Leave(); err != nil { + fmt.Printf("failed leaving the cluster: %v\n", err) + } + + d.serfInstance.Shutdown() + close(ch) + return + case e, ok := <-eventCh: + if !ok { + break + } + + if e.EventType() == serf.EventQuery { + d.processQuery(e.(*serf.Query)) + break + } + + u, ok := e.(serf.UserEvent) + if !ok { + break + } + d.processEvent(u) + } + } +} diff --git a/libnetwork/drivers/overlay/ov_utils.go b/libnetwork/drivers/overlay/ov_utils.go new file mode 100644 index 0000000000..2349c7e81f --- /dev/null +++ b/libnetwork/drivers/overlay/ov_utils.go @@ -0,0 +1,80 @@ +package overlay + +import ( + "fmt" + + "github.com/docker/libnetwork/netutils" + "github.com/docker/libnetwork/types" + "github.com/vishvananda/netlink" +) + +func validateID(nid, eid types.UUID) error { + if nid == "" { + return fmt.Errorf("invalid network id") + } + + if eid == "" { + return fmt.Errorf("invalid endpoint id") + } + + return nil +} + +func createVethPair() (string, string, error) { + // Generate a name for what will be the host side pipe interface + name1, err := netutils.GenerateIfaceName(vethPrefix, vethLen) + if err != nil { + return "", "", fmt.Errorf("error generating veth name1: %v", err) + } + + // Generate a name for what will be the sandbox side pipe interface + name2, err := netutils.GenerateIfaceName(vethPrefix, vethLen) + if err != nil { + return "", "", fmt.Errorf("error generating veth name2: %v", err) + } + + // Generate and add the interface pipe host <-> sandbox + veth := &netlink.Veth{ + LinkAttrs: netlink.LinkAttrs{Name: name1, TxQLen: 0}, + PeerName: name2} + if err := netlink.LinkAdd(veth); err != nil { + return "", "", fmt.Errorf("error creating veth pair: %v", err) + } + + return name1, name2, nil +} + +func createVxlan(vni uint32) (string, error) { + name, err := netutils.GenerateIfaceName("vxlan", 7) + if err != nil { + return "", fmt.Errorf("error generating vxlan name: %v", err) + } + + vxlan := &netlink.Vxlan{ + LinkAttrs: netlink.LinkAttrs{Name: name}, + VxlanId: int(vni), + Learning: true, + Proxy: true, + L3miss: true, + L2miss: true, + } + + if err := netlink.LinkAdd(vxlan); err != nil { + return "", fmt.Errorf("error creating vxlan interface: %v", err) + } + + return name, nil +} + +func deleteVxlan(name string) error { + link, err := netlink.LinkByName(name) + if err != nil { + return fmt.Errorf("failed to find vxlan interface with name %s: %v", name, err) + } + + if err := netlink.LinkDel(link); err != nil { + return fmt.Errorf("error deleting vxlan interface: %v", err) + } + + return nil +} diff --git a/libnetwork/drivers/overlay/overlay.go b/libnetwork/drivers/overlay/overlay.go new file mode 100644 index 0000000000..f911926980 --- /dev/null +++ b/libnetwork/drivers/overlay/overlay.go @@ -0,0 +1,157 @@ +package overlay + +import ( + "encoding/binary" + "fmt" + "net" + "sync" + + "github.com/docker/libnetwork/config" + "github.com/docker/libnetwork/datastore" + "github.com/docker/libnetwork/driverapi" + "github.com/docker/libnetwork/idm" + "github.com/docker/libnetwork/netlabel" + "github.com/docker/libnetwork/types" + "github.com/hashicorp/serf/serf" +) + +const ( + networkType = "overlay" + vethPrefix = "veth" + vethLen = 7 + vxlanIDStart = 256 + vxlanIDEnd = 1000 +) + +type driver struct { + eventCh chan serf.Event + notifyCh chan ovNotify + exitCh chan chan struct{} + ifaceName string + neighIP string + peerDb peerNetworkMap + serfInstance *serf.Serf + networks networkTable + store datastore.DataStore + ipAllocator *idm.Idm + vxlanIdm *idm.Idm + sync.Once + sync.Mutex +} + +var ( + bridgeSubnet, bridgeIP *net.IPNet + once sync.Once + bridgeSubnetInt uint32 +) + +func onceInit() { + var err error + _, bridgeSubnet, err = net.ParseCIDR("172.21.0.0/16") + if err != nil { + panic("could not parse cid 172.21.0.0/16") + } + + bridgeSubnetInt = binary.BigEndian.Uint32(bridgeSubnet.IP.To4()) + + ip, subnet, err := net.ParseCIDR("172.21.255.254/16") + if err != nil { + panic("could not parse cid 172.21.255.254/16") + } + + bridgeIP = &net.IPNet{ + IP: ip, + Mask: subnet.Mask, + } +} + +// Init registers a new instance of overlay driver +func Init(dc driverapi.DriverCallback) error { + once.Do(onceInit) + + c := driverapi.Capability{ + Scope: driverapi.GlobalScope, + } + + return dc.RegisterDriver(networkType, &driver{ + networks: networkTable{}, + peerDb: peerNetworkMap{ + mp: map[types.UUID]peerMap{}, + }, + }, c) +} + +// Fini cleans up the driver resources +func Fini(drv driverapi.Driver) { + d := drv.(*driver) + + if d.exitCh != nil { + waitCh := make(chan struct{}) + + d.exitCh <- waitCh + + <-waitCh + } +} + +func (d *driver) Config(option map[string]interface{}) error { + var onceDone bool + var err error + + d.Do(func() { + onceDone = true + + if ifaceName, ok := option[netlabel.OverlayBindInterface]; ok { + d.ifaceName = ifaceName.(string) + } + + if neighIP, ok := option[netlabel.OverlayNeighborIP]; ok { + d.neighIP = neighIP.(string) + } + + provider, provOk := option[netlabel.KVProvider] + provURL, urlOk := option[netlabel.KVProviderURL] + + if provOk && urlOk { + cfg := &config.DatastoreCfg{ + Client: config.DatastoreClientCfg{ + Provider: provider.(string), + Address: provURL.(string), + }, + } + d.store, err = datastore.NewDataStore(cfg) + if err != nil { + err = fmt.Errorf("failed to initialize data store: %v", err) + return + } + } + + d.vxlanIdm, err = idm.New(d.store, "vxlan-id", vxlanIDStart, vxlanIDEnd) + if err != nil { + err = fmt.Errorf("failed to initialize vxlan id manager: %v", err) + return + } + + d.ipAllocator, err = idm.New(d.store, "ipam-id", 1, 0xFFFF-2) + if err != nil { + err = fmt.Errorf("failed to initalize ipam id manager: %v", err) + return + } + + err = d.serfInit() + if err != nil { + err = fmt.Errorf("initializing serf instance failed: %v", err) + } + + }) + + if !onceDone { + return fmt.Errorf("config already applied to driver") + } + + return err +} + +func (d *driver) Type() string { + return networkType +} diff --git a/libnetwork/drivers/overlay/overlay_test.go b/libnetwork/drivers/overlay/overlay_test.go new file mode 100644 index 0000000000..cc22e02f91 --- /dev/null +++ b/libnetwork/drivers/overlay/overlay_test.go @@ -0,0 +1,130 @@ +package overlay + +import ( + "testing" + "time" + + "github.com/docker/libnetwork/driverapi" +) + +type driverTester struct { + t *testing.T + d driverapi.Driver +} + +const testNetworkType = "overlay" + +func setupDriver(t *testing.T) *driverTester { + dt := &driverTester{t: t} + if err := Init(dt); err != nil { + t.Fatal(err) + } + + opt := make(map[string]interface{}) + if err := dt.d.Config(opt); err != nil { + t.Fatal(err) + } + + return dt +} + +func cleanupDriver(t *testing.T, dt *driverTester) { + ch := make(chan struct{}) + go func() { + Fini(dt.d) + close(ch) + }() + + select { + case <-ch: + case <-time.After(10 * time.Second): + t.Fatal("test timed out because Fini() did not return on time") + } +} + +func (dt *driverTester) RegisterDriver(name string, drv driverapi.Driver, + cap driverapi.Capability) error { + if name != testNetworkType { + dt.t.Fatalf("Expected driver register name to be %q. Instead got %q", + testNetworkType, name) + } + + if _, ok := drv.(*driver); !ok { + dt.t.Fatalf("Expected driver type to be %T. Instead got %T", + &driver{}, drv) + } + + dt.d = drv.(*driver) + return nil +} + +func TestOverlayInit(t *testing.T) { + if err := Init(&driverTester{t: t}); err != nil { + t.Fatal(err) + } +} + +func TestOverlayFiniWithoutConfig(t *testing.T) { + dt := &driverTester{t: t} + if err := Init(dt); err != nil { + t.Fatal(err) + } + + cleanupDriver(t, dt) +} + +func TestOverlayNilConfig(t *testing.T) { + dt := &driverTester{t: t} + if err := Init(dt); err != nil { + t.Fatal(err) + } + + if err := dt.d.Config(nil); err != nil { + t.Fatal(err) + } + + cleanupDriver(t, dt) +} + +func TestOverlayConfig(t *testing.T) { + dt := setupDriver(t) + + time.Sleep(1 * time.Second) + + d := dt.d.(*driver) + if d.notifyCh == nil { + t.Fatal("Driver notify channel wasn't initialzed after Config method") + } + + if d.exitCh == nil { + t.Fatal("Driver serfloop exit channel wasn't initialzed after Config method") + } + + if d.serfInstance == nil { + t.Fatal("Driver serfinstance hasn't been initialized after Config method") + } + + cleanupDriver(t, dt) +} + +func TestOverlayMultipleConfig(t *testing.T) { + dt := setupDriver(t) + + if err := dt.d.Config(nil); err == nil { + t.Fatal("Expected a failure, instead succeded") + } + + cleanupDriver(t, dt) +} + +func TestOverlayType(t *testing.T) { + dt := &driverTester{t: t} + if err := Init(dt); err != nil { + t.Fatal(err) + } + + if dt.d.Type() != testNetworkType { + t.Fatalf("Expected Type() to return %q. Instead got %q", testNetworkType, + dt.d.Type()) + } +} diff --git a/libnetwork/drivers/overlay/peerdb.go b/libnetwork/drivers/overlay/peerdb.go new file mode 100644 index 0000000000..b951edac93 --- /dev/null +++ b/libnetwork/drivers/overlay/peerdb.go @@ -0,0 +1,280 @@ +package overlay + +import ( + "fmt" + "net" + "sync" + "syscall" + + "github.com/docker/libnetwork/types" +) + +type peerKey struct { + peerIP net.IP + peerMac net.HardwareAddr +} + +type peerEntry struct { + eid types.UUID + vtep net.IP + inSandbox bool + isLocal bool +} + +type peerMap struct { + mp map[string]peerEntry + sync.Mutex +} + +type peerNetworkMap struct { + mp map[types.UUID]peerMap + sync.Mutex +} + +func (pKey peerKey) String() string { + return fmt.Sprintf("%s %s", pKey.peerIP, pKey.peerMac) +} + +func (pKey *peerKey) Scan(state fmt.ScanState, verb rune) error { + ipB, err := state.Token(true, nil) + if err != nil { + return err + } + + pKey.peerIP = net.ParseIP(string(ipB)) + + macB, err := state.Token(true, nil) + if err != nil { + return err + } + + pKey.peerMac, err = net.ParseMAC(string(macB)) + if err != nil { + return err + } + + return nil +} + +var peerDbWg sync.WaitGroup + +func (d *driver) peerDbWalk(nid types.UUID, f func(*peerKey, *peerEntry) bool) error { + d.peerDb.Lock() + pMap, ok := d.peerDb.mp[nid] + if !ok { + d.peerDb.Unlock() + return nil + } + d.peerDb.Unlock() + + pMap.Lock() + for pKeyStr, pEntry := range pMap.mp { + var pKey peerKey + if _, err := fmt.Sscan(pKeyStr, &pKey); err != nil { + fmt.Printf("peer key scan failed: %v", err) + } + + if f(&pKey, &pEntry) { + pMap.Unlock() + return nil + } + } + pMap.Unlock() + + return nil +} + +func (d *driver) peerDbSearch(nid types.UUID, peerIP net.IP) (net.HardwareAddr, net.IP, error) { + var ( + peerMac net.HardwareAddr + vtep net.IP + found bool + ) + + err := d.peerDbWalk(nid, func(pKey *peerKey, pEntry *peerEntry) bool { + if pKey.peerIP.Equal(peerIP) { + peerMac = pKey.peerMac + vtep = pEntry.vtep + found = true + return found + } + + return found + }) + + if err != nil { + return nil, nil, fmt.Errorf("peerdb search for peer ip %q failed: %v", peerIP, err) + } + + if !found { + return nil, nil, fmt.Errorf("peer ip %q not found in peerdb", peerIP) + } + + return peerMac, vtep, nil +} + +func (d *driver) peerDbAdd(nid, eid types.UUID, peerIP net.IP, + peerMac net.HardwareAddr, vtep net.IP, isLocal bool) { + + peerDbWg.Wait() + + d.peerDb.Lock() + pMap, ok := d.peerDb.mp[nid] + if !ok { + d.peerDb.mp[nid] = peerMap{ + mp: make(map[string]peerEntry), + } + + pMap = d.peerDb.mp[nid] + } + d.peerDb.Unlock() + + pKey := peerKey{ + peerIP: peerIP, + peerMac: peerMac, + } + + pEntry := peerEntry{ + eid: eid, + vtep: vtep, + isLocal: isLocal, + } + + pMap.Lock() + pMap.mp[pKey.String()] = pEntry + pMap.Unlock() +} + +func (d *driver) peerDbDelete(nid, eid types.UUID, peerIP net.IP, + peerMac net.HardwareAddr, vtep net.IP) { + peerDbWg.Wait() + + d.peerDb.Lock() + pMap, ok := d.peerDb.mp[nid] + if !ok { + d.peerDb.Unlock() + return + } + d.peerDb.Unlock() + + pKey := peerKey{ + peerIP: peerIP, + peerMac: peerMac, + } + + pMap.Lock() + delete(pMap.mp, pKey.String()) + pMap.Unlock() +} + +func (d *driver) peerDbUpdateSandbox(nid types.UUID) { + d.peerDb.Lock() + pMap, ok := d.peerDb.mp[nid] + if !ok { + d.peerDb.Unlock() + return + } + d.peerDb.Unlock() + + peerDbWg.Add(1) + + var peerOps []func() + pMap.Lock() + for pKeyStr, pEntry := range pMap.mp { + var pKey peerKey + if _, err := fmt.Sscan(pKeyStr, &pKey); err != nil { + fmt.Printf("peer key scan failed: %v", err) + } + + if pEntry.isLocal { + continue + } + + op := func() { + if err := d.peerAdd(nid, pEntry.eid, pKey.peerIP, + pKey.peerMac, pEntry.vtep, + false); err != nil { + fmt.Printf("peerdbupdate in sandbox failed for ip %s and mac %s: %v", + pKey.peerIP, pKey.peerMac, err) + } + } + + peerOps = append(peerOps, op) + } + pMap.Unlock() + + for _, op := range peerOps { + op() + } + + peerDbWg.Done() +} + +func (d *driver) peerAdd(nid, eid types.UUID, peerIP net.IP, + peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error { + + if err := validateID(nid, eid); err != nil { + return err + } + + if updateDb { + d.peerDbAdd(nid, eid, peerIP, peerMac, vtep, false) + } + + n := d.network(nid) + if n == nil { + return nil + } + + sbox := n.sandbox() + if sbox == nil { + return nil + } + + // Add neighbor entry for the peer IP + if err := sbox.AddNeighbor(peerIP, peerMac, sbox.NeighborOptions().LinkName(n.vxlanName)); err != nil { + return fmt.Errorf("could not add neigbor entry into the sandbox: %v", err) + } + + // Add fdb entry to the bridge for the peer mac + if err := sbox.AddNeighbor(vtep, peerMac, sbox.NeighborOptions().LinkName(n.vxlanName), + sbox.NeighborOptions().Family(syscall.AF_BRIDGE)); err != nil { + return fmt.Errorf("could not add fdb entry into the sandbox: %v", err) + } + + return nil +} + +func (d *driver) peerDelete(nid, eid types.UUID, peerIP net.IP, + peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error { + + if err := validateID(nid, eid); err != nil { + return err + } + + if updateDb { + d.peerDbDelete(nid, eid, peerIP, peerMac, vtep) + } + + n := d.network(nid) + if n == nil { + return nil + } + + sbox := n.sandbox() + if sbox == nil { + return nil + } + + // Delete fdb entry to the bridge for the peer mac + if err := sbox.DeleteNeighbor(vtep, peerMac); err != nil { + return fmt.Errorf("could not delete fdb entry into the sandbox: %v", err) + } + + // Delete neighbor entry for the peer IP + if err := sbox.DeleteNeighbor(peerIP, peerMac); err != nil { + return fmt.Errorf("could not delete neigbor entry into the sandbox: %v", err) + } + + return nil +} diff --git a/libnetwork/drivers_linux.go b/libnetwork/drivers_linux.go index 7de28f95eb..b8294d452e 100644 --- a/libnetwork/drivers_linux.go +++ b/libnetwork/drivers_linux.go @@ -5,6 +5,7 @@ import ( "github.com/docker/libnetwork/drivers/bridge" "github.com/docker/libnetwork/drivers/host" "github.com/docker/libnetwork/drivers/null" + o "github.com/docker/libnetwork/drivers/overlay" "github.com/docker/libnetwork/drivers/remote" ) @@ -14,6 +15,7 @@ func initDrivers(dc driverapi.DriverCallback) error { host.Init, null.Init, remote.Init, + o.Init, } { if err := fn(dc); err != nil { return err diff --git a/libnetwork/netlabel/labels.go b/libnetwork/netlabel/labels.go index 799ef5a08a..42779be8a4 100644 --- a/libnetwork/netlabel/labels.go +++ b/libnetwork/netlabel/labels.go @@ -1,5 +1,7 @@ package netlabel +import "strings" + const ( // Prefix constant marks the reserved label space for libnetwork Prefix = "com.docker.network" @@ -21,4 +23,30 @@ const ( //EnableIPv6 constant represents enabling IPV6 at network level EnableIPv6 = Prefix + ".enable_ipv6" + + // KVProvider constant represents the KV provider backend + KVProvider = DriverPrefix + ".kv_provider" + + // KVProviderURL constant represents the KV provider URL + KVProviderURL = DriverPrefix + ".kv_provider_url" + + // OverlayBindInterface constant represents overlay driver bind interface + OverlayBindInterface = DriverPrefix + ".overlay.bind_interface" + + // OverlayNeighborIP constant represents overlay driver neighbor IP + OverlayNeighborIP = DriverPrefix + ".overlay.neighbor_ip" ) + +// Key extracts the key portion of the label +func Key(label string) string { + kv := strings.SplitN(label, "=", 2) + + return kv[0] +} + +// Value extracts the value portion of the label +func Value(label string) string { + kv := strings.SplitN(label, "=", 2) + + return kv[1] +}