mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Support for multiple subnets in a overlay network
Signed-off-by: Santhosh Manohar <santhosh@docker.com>
This commit is contained in:
parent
8dde3b2380
commit
6e327a5afb
5 changed files with 317 additions and 125 deletions
|
@ -24,10 +24,23 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo,
|
|||
return fmt.Errorf("could not find endpoint with id %s", eid)
|
||||
}
|
||||
|
||||
s := n.getSubnetforIP(ep.addr)
|
||||
if s == nil {
|
||||
return fmt.Errorf("could not find subnet for endpoint %s", eid)
|
||||
}
|
||||
|
||||
if err := n.obtainVxlanID(s); err != nil {
|
||||
return fmt.Errorf("couldn't get vxlan id for %q: %v", s.subnetIP.String(), err)
|
||||
}
|
||||
|
||||
if err := n.joinSandbox(); err != nil {
|
||||
return fmt.Errorf("network sandbox join failed: %v", err)
|
||||
}
|
||||
|
||||
if err := n.joinSubnetSandbox(s); err != nil {
|
||||
return fmt.Errorf("subnet sandbox join failed for %q: %v", s.subnetIP.String(), err)
|
||||
}
|
||||
|
||||
sbox := n.sandbox()
|
||||
|
||||
name1, name2, err := createVethPair()
|
||||
|
@ -48,7 +61,7 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo,
|
|||
}
|
||||
|
||||
if err := sbox.AddInterface(name1, "veth",
|
||||
sbox.InterfaceOptions().Master("bridge1")); err != nil {
|
||||
sbox.InterfaceOptions().Master(s.brName)); err != nil {
|
||||
return fmt.Errorf("could not add veth pair inside the network sandbox: %v", err)
|
||||
}
|
||||
|
||||
|
@ -72,7 +85,7 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo,
|
|||
}
|
||||
}
|
||||
|
||||
d.peerDbAdd(nid, eid, ep.addr.IP, ep.mac,
|
||||
d.peerDbAdd(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac,
|
||||
net.ParseIP(d.bindAddress), true)
|
||||
d.pushLocalEndpointEvent("join", nid, eid)
|
||||
|
||||
|
|
|
@ -37,13 +37,18 @@ func (n *network) deleteEndpoint(eid string) {
|
|||
|
||||
func (d *driver) CreateEndpoint(nid, eid string, ifInfo driverapi.InterfaceInfo,
|
||||
epOptions map[string]interface{}) error {
|
||||
if err := validateID(nid, eid); err != nil {
|
||||
var err error
|
||||
|
||||
if err = validateID(nid, eid); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n := d.network(nid)
|
||||
if n == nil {
|
||||
return fmt.Errorf("network id %q not found", nid)
|
||||
n, err = d.createNetworkfromStore(nid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("network id %q not found", nid)
|
||||
}
|
||||
}
|
||||
|
||||
ep := &endpoint{
|
||||
|
@ -51,11 +56,14 @@ func (d *driver) CreateEndpoint(nid, eid string, ifInfo driverapi.InterfaceInfo,
|
|||
addr: ifInfo.Address(),
|
||||
mac: ifInfo.MacAddress(),
|
||||
}
|
||||
|
||||
if ep.addr == nil {
|
||||
return fmt.Errorf("create endpoint was not passed interface IP address")
|
||||
}
|
||||
|
||||
if s := n.getSubnetforIP(ep.addr); s == nil {
|
||||
return fmt.Errorf("no matching subnet for IP %q in network %q\n", ep.addr, nid)
|
||||
}
|
||||
|
||||
if ep.mac == nil {
|
||||
ep.mac = netutils.GenerateMACFromIP(ep.addr.IP)
|
||||
if err := ifInfo.SetMacAddress(ep.mac); err != nil {
|
||||
|
|
|
@ -10,37 +10,47 @@ import (
|
|||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/libnetwork/datastore"
|
||||
"github.com/docker/libnetwork/driverapi"
|
||||
"github.com/docker/libnetwork/netutils"
|
||||
"github.com/docker/libnetwork/osl"
|
||||
"github.com/docker/libnetwork/types"
|
||||
"github.com/vishvananda/netlink"
|
||||
"github.com/vishvananda/netlink/nl"
|
||||
)
|
||||
|
||||
type networkTable map[string]*network
|
||||
|
||||
type subnet struct {
|
||||
once *sync.Once
|
||||
vxlanName string
|
||||
brName string
|
||||
vni uint32
|
||||
initErr error
|
||||
subnetIP *net.IPNet
|
||||
gwIP *net.IPNet
|
||||
}
|
||||
|
||||
type network struct {
|
||||
id string
|
||||
vni uint32
|
||||
dbIndex uint64
|
||||
dbExists bool
|
||||
sbox osl.Sandbox
|
||||
endpoints endpointTable
|
||||
vxlanName string
|
||||
driver *driver
|
||||
joinCnt int
|
||||
once *sync.Once
|
||||
initEpoch int
|
||||
initErr error
|
||||
subnets []*net.IPNet
|
||||
gateways []*net.IPNet
|
||||
subnets []*subnet
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Data, ipV6Data []driverapi.IPAMData) error {
|
||||
var err error
|
||||
if id == "" {
|
||||
return fmt.Errorf("invalid network id")
|
||||
}
|
||||
|
||||
if err := d.configure(); err != nil {
|
||||
if err = d.configure(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -49,23 +59,54 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Dat
|
|||
driver: d,
|
||||
endpoints: endpointTable{},
|
||||
once: &sync.Once{},
|
||||
subnets: []*subnet{},
|
||||
}
|
||||
|
||||
n.subnets = make([]*net.IPNet, len(ipV4Data))
|
||||
n.gateways = make([]*net.IPNet, len(ipV4Data))
|
||||
|
||||
for i, ipd := range ipV4Data {
|
||||
n.subnets[i] = ipd.Pool
|
||||
n.gateways[i] = ipd.Gateway
|
||||
for _, ipd := range ipV4Data {
|
||||
s := &subnet{
|
||||
subnetIP: ipd.Pool,
|
||||
gwIP: ipd.Gateway,
|
||||
once: &sync.Once{},
|
||||
}
|
||||
n.subnets = append(n.subnets, s)
|
||||
}
|
||||
|
||||
for {
|
||||
// If the datastore has the network object already
|
||||
// there is no need to do a write.
|
||||
err = d.store.GetObject(datastore.Key(n.Key()...), n)
|
||||
if err == nil || err != datastore.ErrKeyNotFound {
|
||||
break
|
||||
}
|
||||
|
||||
err = n.writeToStore()
|
||||
if err == nil || err != datastore.ErrKeyModified {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update data store for network %v: %v", n.id, err)
|
||||
}
|
||||
d.addNetwork(n)
|
||||
|
||||
if err := n.obtainVxlanID(); err != nil {
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *driver) createNetworkfromStore(nid string) (*network, error) {
|
||||
n := &network{
|
||||
id: nid,
|
||||
driver: d,
|
||||
endpoints: endpointTable{},
|
||||
once: &sync.Once{},
|
||||
subnets: []*subnet{},
|
||||
}
|
||||
|
||||
return nil
|
||||
err := d.store.GetObject(datastore.Key(n.Key()...), n)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to get network %q from data store, %v", nid, err)
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (d *driver) DeleteNetwork(nid string) error {
|
||||
|
@ -100,13 +141,21 @@ func (n *network) joinSandbox() error {
|
|||
n.initErr = n.initSandbox()
|
||||
})
|
||||
|
||||
return n.initErr
|
||||
}
|
||||
|
||||
func (n *network) joinSubnetSandbox(s *subnet) error {
|
||||
|
||||
s.once.Do(func() {
|
||||
s.initErr = n.initSubnetSandbox(s)
|
||||
})
|
||||
// Increment joinCnt in all the goroutines only when the one time initSandbox
|
||||
// was a success.
|
||||
n.Lock()
|
||||
if n.initErr == nil {
|
||||
if s.initErr == nil {
|
||||
n.joinCnt++
|
||||
}
|
||||
err := n.initErr
|
||||
err := s.initErr
|
||||
n.Unlock()
|
||||
|
||||
return err
|
||||
|
@ -124,6 +173,9 @@ func (n *network) leaveSandbox() {
|
|||
// Reinitialize the once variable so that we will be able to trigger one time
|
||||
// sandbox initialization(again) when another container joins subsequently.
|
||||
n.once = &sync.Once{}
|
||||
for _, s := range n.subnets {
|
||||
s.once = &sync.Once{}
|
||||
}
|
||||
n.Unlock()
|
||||
|
||||
n.destroySandbox()
|
||||
|
@ -136,14 +188,50 @@ func (n *network) destroySandbox() {
|
|||
iface.Remove()
|
||||
}
|
||||
|
||||
if err := deleteVxlan(n.vxlanName); err != nil {
|
||||
logrus.Warnf("could not cleanup sandbox properly: %v", err)
|
||||
for _, s := range n.subnets {
|
||||
if s.vxlanName != "" {
|
||||
err := deleteVxlan(s.vxlanName)
|
||||
if err != nil {
|
||||
logrus.Warnf("could not cleanup sandbox properly: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sbox.Destroy()
|
||||
}
|
||||
}
|
||||
|
||||
func (n *network) initSubnetSandbox(s *subnet) error {
|
||||
// create a bridge and vxlan device for this subnet and move it to the sandbox
|
||||
brName, err := netutils.GenerateIfaceName("bridge", 7)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sbox := n.sandbox()
|
||||
|
||||
if err := sbox.AddInterface(brName, "br",
|
||||
sbox.InterfaceOptions().Address(s.gwIP),
|
||||
sbox.InterfaceOptions().Bridge(true)); err != nil {
|
||||
return fmt.Errorf("bridge creation in sandbox failed for subnet %q: %v", s.subnetIP.IP.String(), err)
|
||||
}
|
||||
|
||||
vxlanName, err := createVxlan(n.vxlanID(s))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := sbox.AddInterface(vxlanName, "vxlan",
|
||||
sbox.InterfaceOptions().Master(brName)); err != nil {
|
||||
return fmt.Errorf("vxlan interface creation failed for subnet %q: %v", s.subnetIP.IP.String(), err)
|
||||
}
|
||||
|
||||
n.Lock()
|
||||
s.vxlanName = vxlanName
|
||||
s.brName = brName
|
||||
n.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *network) initSandbox() error {
|
||||
n.Lock()
|
||||
n.initEpoch++
|
||||
|
@ -155,15 +243,10 @@ func (n *network) initSandbox() error {
|
|||
return fmt.Errorf("could not create network sandbox: %v", err)
|
||||
}
|
||||
|
||||
// Add a bridge inside the namespace
|
||||
if err := sbox.AddInterface("bridge1", "br",
|
||||
sbox.InterfaceOptions().Address(n.gateways[0]),
|
||||
sbox.InterfaceOptions().Bridge(true)); err != nil {
|
||||
return fmt.Errorf("could not create bridge inside the network sandbox: %v", err)
|
||||
}
|
||||
|
||||
n.setSandbox(sbox)
|
||||
|
||||
n.driver.peerDbUpdateSandbox(n.id)
|
||||
|
||||
var nlSock *nl.NetlinkSocket
|
||||
sbox.InvokeFunc(func() {
|
||||
nlSock, err = nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH)
|
||||
|
@ -173,27 +256,6 @@ func (n *network) initSandbox() error {
|
|||
})
|
||||
|
||||
go n.watchMiss(nlSock)
|
||||
return n.initVxlan()
|
||||
}
|
||||
|
||||
func (n *network) initVxlan() error {
|
||||
var vxlanName string
|
||||
n.Lock()
|
||||
sbox := n.sbox
|
||||
n.Unlock()
|
||||
|
||||
vxlanName, err := createVxlan(n.vxlanID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = sbox.AddInterface(vxlanName, "vxlan",
|
||||
sbox.InterfaceOptions().Master("bridge1")); err != nil {
|
||||
return fmt.Errorf("could not add vxlan interface inside the network sandbox: %v", err)
|
||||
}
|
||||
|
||||
n.vxlanName = vxlanName
|
||||
n.driver.peerDbUpdateSandbox(n.id)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -224,14 +286,14 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) {
|
|||
continue
|
||||
}
|
||||
|
||||
mac, vtep, err := n.driver.resolvePeer(n.id, neigh.IP)
|
||||
mac, IPmask, vtep, err := n.driver.resolvePeer(n.id, neigh.IP)
|
||||
if err != nil {
|
||||
logrus.Errorf("could not resolve peer %q: %v", neigh.IP, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := n.driver.peerAdd(n.id, "dummy", neigh.IP, mac, vtep, true); err != nil {
|
||||
logrus.Errorf("could not add neighbor entry for missed peer: %v", err)
|
||||
if err := n.driver.peerAdd(n.id, "dummy", neigh.IP, IPmask, mac, vtep, true); err != nil {
|
||||
logrus.Errorf("could not add neighbor entry for missed peer %q: %v", neigh.IP, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -269,16 +331,16 @@ func (n *network) setSandbox(sbox osl.Sandbox) {
|
|||
n.Unlock()
|
||||
}
|
||||
|
||||
func (n *network) vxlanID() uint32 {
|
||||
func (n *network) vxlanID(s *subnet) uint32 {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
return n.vni
|
||||
return s.vni
|
||||
}
|
||||
|
||||
func (n *network) setVxlanID(vni uint32) {
|
||||
func (n *network) setVxlanID(s *subnet, vni uint32) {
|
||||
n.Lock()
|
||||
n.vni = vni
|
||||
s.vni = vni
|
||||
n.Unlock()
|
||||
}
|
||||
|
||||
|
@ -291,7 +353,19 @@ func (n *network) KeyPrefix() []string {
|
|||
}
|
||||
|
||||
func (n *network) Value() []byte {
|
||||
b, err := json.Marshal(n.vxlanID())
|
||||
overlayNetmap := make(map[string]interface{})
|
||||
|
||||
s := n.subnets[0]
|
||||
if s == nil {
|
||||
logrus.Errorf("Network %s has no subnets", n.id)
|
||||
return []byte{}
|
||||
}
|
||||
|
||||
overlayNetmap["subnetIP"] = s.subnetIP.String()
|
||||
overlayNetmap["gwIP"] = s.gwIP.String()
|
||||
overlayNetmap["vni"] = s.vni
|
||||
|
||||
b, err := json.Marshal(overlayNetmap)
|
||||
if err != nil {
|
||||
return []byte{}
|
||||
}
|
||||
|
@ -317,12 +391,44 @@ func (n *network) Skip() bool {
|
|||
}
|
||||
|
||||
func (n *network) SetValue(value []byte) error {
|
||||
var vni uint32
|
||||
err := json.Unmarshal(value, &vni)
|
||||
if err == nil {
|
||||
n.setVxlanID(vni)
|
||||
var (
|
||||
overlayNetmap map[string]interface{}
|
||||
err error
|
||||
)
|
||||
|
||||
err = json.Unmarshal(value, &overlayNetmap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
|
||||
subnetIPstr := overlayNetmap["subnetIP"].(string)
|
||||
gwIPstr := overlayNetmap["gwIP"].(string)
|
||||
vni := uint32(overlayNetmap["vni"].(float64))
|
||||
|
||||
subnetIP, _ := types.ParseCIDR(subnetIPstr)
|
||||
gwIP, _ := types.ParseCIDR(gwIPstr)
|
||||
|
||||
// If the network is being created by reading from the
|
||||
// datastore subnets have to created. If the network
|
||||
// already exists update only the subnets' vni field
|
||||
if len(n.subnets) == 0 {
|
||||
s := &subnet{
|
||||
subnetIP: subnetIP,
|
||||
gwIP: gwIP,
|
||||
vni: vni,
|
||||
once: &sync.Once{},
|
||||
}
|
||||
n.subnets = append(n.subnets, s)
|
||||
return nil
|
||||
}
|
||||
|
||||
sNet := n.getMatchingSubnet(subnetIP)
|
||||
if sNet != nil {
|
||||
if vni != 0 {
|
||||
sNet.vni = vni
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *network) DataScope() datastore.DataScope {
|
||||
|
@ -338,7 +444,7 @@ func (n *network) releaseVxlanID() error {
|
|||
return fmt.Errorf("no datastore configured. cannot release vxlan id")
|
||||
}
|
||||
|
||||
if n.vxlanID() == 0 {
|
||||
if len(n.subnets) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -352,38 +458,80 @@ func (n *network) releaseVxlanID() error {
|
|||
return fmt.Errorf("failed to delete network to vxlan id map: %v", err)
|
||||
}
|
||||
|
||||
n.driver.vxlanIdm.Release(n.vxlanID())
|
||||
n.setVxlanID(0)
|
||||
for _, s := range n.subnets {
|
||||
n.driver.vxlanIdm.Release(n.vxlanID(s))
|
||||
n.setVxlanID(s, 0)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *network) obtainVxlanID() error {
|
||||
func (n *network) obtainVxlanID(s *subnet) error {
|
||||
//return if the subnet already has a vxlan id assigned
|
||||
if s.vni != 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if n.driver.store == nil {
|
||||
return fmt.Errorf("no datastore configured. cannot obtain vxlan id")
|
||||
}
|
||||
|
||||
for {
|
||||
var vxlanID uint32
|
||||
if err := n.driver.store.GetObject(datastore.Key(n.Key()...), n); err != nil {
|
||||
if err == datastore.ErrKeyNotFound {
|
||||
vxlanID, err = n.driver.vxlanIdm.GetID()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to allocate vxlan id: %v", err)
|
||||
}
|
||||
return fmt.Errorf("getting network %q from datastore failed %v", n.id, err)
|
||||
}
|
||||
|
||||
n.setVxlanID(vxlanID)
|
||||
if err := n.writeToStore(); err != nil {
|
||||
n.driver.vxlanIdm.Release(n.vxlanID())
|
||||
n.setVxlanID(0)
|
||||
if err == datastore.ErrKeyModified {
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("failed to update data store with vxlan id: %v", err)
|
||||
}
|
||||
return nil
|
||||
if s.vni == 0 {
|
||||
vxlanID, err := n.driver.vxlanIdm.GetID()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to allocate vxlan id: %v", err)
|
||||
}
|
||||
return fmt.Errorf("failed to obtain vxlan id from data store: %v", err)
|
||||
|
||||
n.setVxlanID(s, vxlanID)
|
||||
if err := n.writeToStore(); err != nil {
|
||||
n.driver.vxlanIdm.Release(n.vxlanID(s))
|
||||
n.setVxlanID(s, 0)
|
||||
if err == datastore.ErrKeyModified {
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("network %q failed to update data store: %v", n.id, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// getSubnetforIP returns the subnet to which the given IP belongs
|
||||
func (n *network) getSubnetforIP(ip *net.IPNet) *subnet {
|
||||
for _, s := range n.subnets {
|
||||
// first check if the mask lengths are the same
|
||||
i, _ := s.subnetIP.Mask.Size()
|
||||
j, _ := ip.Mask.Size()
|
||||
if i != j {
|
||||
continue
|
||||
}
|
||||
if s.subnetIP.Contains(ip.IP) {
|
||||
return s
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getMatchingSubnet return the network's subnet that matches the input
|
||||
func (n *network) getMatchingSubnet(ip *net.IPNet) *subnet {
|
||||
if ip == nil {
|
||||
return nil
|
||||
}
|
||||
for _, s := range n.subnets {
|
||||
// first check if the mask lengths are the same
|
||||
i, _ := s.subnetIP.Mask.Size()
|
||||
j, _ := ip.Mask.Size()
|
||||
if i != j {
|
||||
continue
|
||||
}
|
||||
if s.subnetIP.IP.Equal(ip.IP) {
|
||||
return s
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -83,7 +83,8 @@ func (d *driver) notifyEvent(event ovNotify) {
|
|||
n := d.network(event.nid)
|
||||
ep := n.endpoint(event.eid)
|
||||
|
||||
ePayload := fmt.Sprintf("%s %s %s", event.action, ep.addr.IP.String(), ep.mac.String())
|
||||
ePayload := fmt.Sprintf("%s %s %s %s", event.action, ep.addr.IP.String(),
|
||||
net.IP(ep.addr.Mask).String(), ep.mac.String())
|
||||
eName := fmt.Sprintf("jl %s %s %s", d.serfInstance.LocalMember().Addr.String(),
|
||||
event.nid, event.eid)
|
||||
|
||||
|
@ -96,17 +97,17 @@ func (d *driver) processEvent(u serf.UserEvent) {
|
|||
fmt.Printf("Received user event name:%s, payload:%s\n", u.Name,
|
||||
string(u.Payload))
|
||||
|
||||
var dummy, action, vtepStr, nid, eid, ipStr, macStr string
|
||||
var dummy, action, vtepStr, nid, eid, ipStr, maskStr, macStr string
|
||||
if _, err := fmt.Sscan(u.Name, &dummy, &vtepStr, &nid, &eid); err != nil {
|
||||
fmt.Printf("Failed to scan name string: %v\n", err)
|
||||
}
|
||||
|
||||
if _, err := fmt.Sscan(string(u.Payload), &action,
|
||||
&ipStr, &macStr); err != nil {
|
||||
&ipStr, &maskStr, &macStr); err != nil {
|
||||
fmt.Printf("Failed to scan value string: %v\n", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Parsed data = %s/%s/%s/%s/%s\n", nid, eid, vtepStr, ipStr, macStr)
|
||||
fmt.Printf("Parsed data = %s/%s/%s/%s/%s/%s\n", nid, eid, vtepStr, ipStr, maskStr, macStr)
|
||||
|
||||
mac, err := net.ParseMAC(macStr)
|
||||
if err != nil {
|
||||
|
@ -119,12 +120,12 @@ func (d *driver) processEvent(u serf.UserEvent) {
|
|||
|
||||
switch action {
|
||||
case "join":
|
||||
if err := d.peerAdd(nid, eid, net.ParseIP(ipStr), mac,
|
||||
if err := d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac,
|
||||
net.ParseIP(vtepStr), true); err != nil {
|
||||
fmt.Printf("Peer add failed in the driver: %v\n", err)
|
||||
}
|
||||
case "leave":
|
||||
if err := d.peerDelete(nid, eid, net.ParseIP(ipStr), mac,
|
||||
if err := d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac,
|
||||
net.ParseIP(vtepStr), true); err != nil {
|
||||
fmt.Printf("Peer delete failed in the driver: %v\n", err)
|
||||
}
|
||||
|
@ -140,38 +141,38 @@ func (d *driver) processQuery(q *serf.Query) {
|
|||
fmt.Printf("Failed to scan query payload string: %v\n", err)
|
||||
}
|
||||
|
||||
peerMac, vtep, err := d.peerDbSearch(nid, net.ParseIP(ipStr))
|
||||
peerMac, peerIPMask, vtep, err := d.peerDbSearch(nid, net.ParseIP(ipStr))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
q.Respond([]byte(fmt.Sprintf("%s %s", peerMac.String(), vtep.String())))
|
||||
q.Respond([]byte(fmt.Sprintf("%s %s %s", peerMac.String(), net.IP(peerIPMask).String(), vtep.String())))
|
||||
}
|
||||
|
||||
func (d *driver) resolvePeer(nid string, peerIP net.IP) (net.HardwareAddr, net.IP, error) {
|
||||
func (d *driver) resolvePeer(nid string, peerIP net.IP) (net.HardwareAddr, net.IPMask, net.IP, error) {
|
||||
qPayload := fmt.Sprintf("%s %s", string(nid), peerIP.String())
|
||||
resp, err := d.serfInstance.Query("peerlookup", []byte(qPayload), nil)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("resolving peer by querying the cluster failed: %v", err)
|
||||
return nil, nil, nil, fmt.Errorf("resolving peer by querying the cluster failed: %v", err)
|
||||
}
|
||||
|
||||
respCh := resp.ResponseCh()
|
||||
select {
|
||||
case r := <-respCh:
|
||||
var macStr, vtepStr string
|
||||
if _, err := fmt.Sscan(string(r.Payload), &macStr, &vtepStr); err != nil {
|
||||
return nil, nil, fmt.Errorf("bad response %q for the resolve query: %v", string(r.Payload), err)
|
||||
var macStr, maskStr, vtepStr string
|
||||
if _, err := fmt.Sscan(string(r.Payload), &macStr, &maskStr, &vtepStr); err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("bad response %q for the resolve query: %v", string(r.Payload), err)
|
||||
}
|
||||
|
||||
mac, err := net.ParseMAC(macStr)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to parse mac: %v", err)
|
||||
return nil, nil, nil, fmt.Errorf("failed to parse mac: %v", err)
|
||||
}
|
||||
|
||||
return mac, net.ParseIP(vtepStr), nil
|
||||
return mac, net.IPMask(net.ParseIP(maskStr).To4()), net.ParseIP(vtepStr), nil
|
||||
|
||||
case <-time.After(time.Second):
|
||||
return nil, nil, fmt.Errorf("timed out resolving peer by querying the cluster")
|
||||
return nil, nil, nil, fmt.Errorf("timed out resolving peer by querying the cluster")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -13,10 +13,11 @@ type peerKey struct {
|
|||
}
|
||||
|
||||
type peerEntry struct {
|
||||
eid string
|
||||
vtep net.IP
|
||||
inSandbox bool
|
||||
isLocal bool
|
||||
eid string
|
||||
vtep net.IP
|
||||
peerIPMask net.IPMask
|
||||
inSandbox bool
|
||||
isLocal bool
|
||||
}
|
||||
|
||||
type peerMap struct {
|
||||
|
@ -98,16 +99,18 @@ func (d *driver) peerDbNetworkWalk(nid string, f func(*peerKey, *peerEntry) bool
|
|||
return nil
|
||||
}
|
||||
|
||||
func (d *driver) peerDbSearch(nid string, peerIP net.IP) (net.HardwareAddr, net.IP, error) {
|
||||
func (d *driver) peerDbSearch(nid string, peerIP net.IP) (net.HardwareAddr, net.IPMask, net.IP, error) {
|
||||
var (
|
||||
peerMac net.HardwareAddr
|
||||
vtep net.IP
|
||||
found bool
|
||||
peerMac net.HardwareAddr
|
||||
vtep net.IP
|
||||
peerIPMask net.IPMask
|
||||
found bool
|
||||
)
|
||||
|
||||
err := d.peerDbNetworkWalk(nid, func(pKey *peerKey, pEntry *peerEntry) bool {
|
||||
if pKey.peerIP.Equal(peerIP) {
|
||||
peerMac = pKey.peerMac
|
||||
peerIPMask = pEntry.peerIPMask
|
||||
vtep = pEntry.vtep
|
||||
found = true
|
||||
return found
|
||||
|
@ -117,17 +120,17 @@ func (d *driver) peerDbSearch(nid string, peerIP net.IP) (net.HardwareAddr, net.
|
|||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("peerdb search for peer ip %q failed: %v", peerIP, err)
|
||||
return nil, nil, nil, fmt.Errorf("peerdb search for peer ip %q failed: %v", peerIP, err)
|
||||
}
|
||||
|
||||
if !found {
|
||||
return nil, nil, fmt.Errorf("peer ip %q not found in peerdb", peerIP)
|
||||
return nil, nil, nil, fmt.Errorf("peer ip %q not found in peerdb", peerIP)
|
||||
}
|
||||
|
||||
return peerMac, vtep, nil
|
||||
return peerMac, peerIPMask, vtep, nil
|
||||
}
|
||||
|
||||
func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP,
|
||||
func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
|
||||
peerMac net.HardwareAddr, vtep net.IP, isLocal bool) {
|
||||
|
||||
peerDbWg.Wait()
|
||||
|
@ -149,9 +152,10 @@ func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP,
|
|||
}
|
||||
|
||||
pEntry := peerEntry{
|
||||
eid: eid,
|
||||
vtep: vtep,
|
||||
isLocal: isLocal,
|
||||
eid: eid,
|
||||
vtep: vtep,
|
||||
peerIPMask: peerIPMask,
|
||||
isLocal: isLocal,
|
||||
}
|
||||
|
||||
pMap.Lock()
|
||||
|
@ -159,7 +163,7 @@ func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP,
|
|||
pMap.Unlock()
|
||||
}
|
||||
|
||||
func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP,
|
||||
func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
|
||||
peerMac net.HardwareAddr, vtep net.IP) {
|
||||
peerDbWg.Wait()
|
||||
|
||||
|
@ -209,7 +213,7 @@ func (d *driver) peerDbUpdateSandbox(nid string) {
|
|||
// a copy of pEntry before capturing it in the following closure.
|
||||
entry := pEntry
|
||||
op := func() {
|
||||
if err := d.peerAdd(nid, entry.eid, pKey.peerIP,
|
||||
if err := d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask,
|
||||
pKey.peerMac, entry.vtep,
|
||||
false); err != nil {
|
||||
fmt.Printf("peerdbupdate in sandbox failed for ip %s and mac %s: %v",
|
||||
|
@ -228,7 +232,7 @@ func (d *driver) peerDbUpdateSandbox(nid string) {
|
|||
peerDbWg.Done()
|
||||
}
|
||||
|
||||
func (d *driver) peerAdd(nid, eid string, peerIP net.IP,
|
||||
func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
|
||||
peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error {
|
||||
|
||||
if err := validateID(nid, eid); err != nil {
|
||||
|
@ -236,7 +240,7 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP,
|
|||
}
|
||||
|
||||
if updateDb {
|
||||
d.peerDbAdd(nid, eid, peerIP, peerMac, vtep, false)
|
||||
d.peerDbAdd(nid, eid, peerIP, peerIPMask, peerMac, vtep, false)
|
||||
}
|
||||
|
||||
n := d.network(nid)
|
||||
|
@ -249,13 +253,31 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP,
|
|||
return nil
|
||||
}
|
||||
|
||||
IP := &net.IPNet{
|
||||
IP: peerIP,
|
||||
Mask: peerIPMask,
|
||||
}
|
||||
|
||||
s := n.getSubnetforIP(IP)
|
||||
if s == nil {
|
||||
return fmt.Errorf("couldn't find the subnet %q in network %q\n", IP.String(), n.id)
|
||||
}
|
||||
|
||||
if err := n.obtainVxlanID(s); err != nil {
|
||||
return fmt.Errorf("couldn't get vxlan id for %q: %v", s.subnetIP.String(), err)
|
||||
}
|
||||
|
||||
if err := n.joinSubnetSandbox(s); err != nil {
|
||||
return fmt.Errorf("subnet sandbox join failed for %q: %v", s.subnetIP.String(), err)
|
||||
}
|
||||
|
||||
// Add neighbor entry for the peer IP
|
||||
if err := sbox.AddNeighbor(peerIP, peerMac, sbox.NeighborOptions().LinkName(n.vxlanName)); err != nil {
|
||||
if err := sbox.AddNeighbor(peerIP, peerMac, sbox.NeighborOptions().LinkName(s.vxlanName)); err != nil {
|
||||
return fmt.Errorf("could not add neigbor entry into the sandbox: %v", err)
|
||||
}
|
||||
|
||||
// Add fdb entry to the bridge for the peer mac
|
||||
if err := sbox.AddNeighbor(vtep, peerMac, sbox.NeighborOptions().LinkName(n.vxlanName),
|
||||
if err := sbox.AddNeighbor(vtep, peerMac, sbox.NeighborOptions().LinkName(s.vxlanName),
|
||||
sbox.NeighborOptions().Family(syscall.AF_BRIDGE)); err != nil {
|
||||
return fmt.Errorf("could not add fdb entry into the sandbox: %v", err)
|
||||
}
|
||||
|
@ -263,7 +285,7 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP,
|
|||
return nil
|
||||
}
|
||||
|
||||
func (d *driver) peerDelete(nid, eid string, peerIP net.IP,
|
||||
func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
|
||||
peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error {
|
||||
|
||||
if err := validateID(nid, eid); err != nil {
|
||||
|
@ -271,7 +293,7 @@ func (d *driver) peerDelete(nid, eid string, peerIP net.IP,
|
|||
}
|
||||
|
||||
if updateDb {
|
||||
d.peerDbDelete(nid, eid, peerIP, peerMac, vtep)
|
||||
d.peerDbDelete(nid, eid, peerIP, peerIPMask, peerMac, vtep)
|
||||
}
|
||||
|
||||
n := d.network(nid)
|
||||
|
|
Loading…
Add table
Reference in a new issue