Merge pull request #1636 from msabansal/overlayfix

Cleaning up windows overlay network driver code and making it rely on HNS network information
This commit is contained in:
Madhu Venugopal 2017-02-14 11:03:07 -08:00 committed by GitHub
commit 1b8c4b421b
8 changed files with 194 additions and 1030 deletions

View File

@ -26,10 +26,6 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo,
return fmt.Errorf("could not find endpoint with id %s", eid)
}
if err := d.writeEndpointToStore(ep); err != nil {
return fmt.Errorf("failed to update overlay endpoint %s to local data store: %v", ep.id[0:7], err)
}
buf, err := proto.Marshal(&PeerRecord{
EndpointIP: ep.addr.String(),
EndpointMAC: ep.mac.String(),
@ -43,9 +39,6 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo,
if err := jinfo.AddTableEntry(ovPeerTable, eid, buf); err != nil {
logrus.Errorf("overlay: Failed adding table entry to joininfo: %v", err)
}
d.pushLocalEndpointEvent("join", nid, eid)
return nil
}
@ -106,7 +99,5 @@ func (d *driver) Leave(nid, eid string) error {
return err
}
d.pushLocalEndpointEvent("leave", nid, eid)
return nil
}

View File

@ -7,9 +7,7 @@ import (
"github.com/Microsoft/hcsshim"
"github.com/Sirupsen/logrus"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/types"
)
type endpointTable map[string]*endpoint
@ -23,8 +21,6 @@ type endpoint struct {
remote bool
mac net.HardwareAddr
addr *net.IPNet
dbExists bool
dbIndex uint64
}
func validateID(nid, eid string) error {
@ -67,6 +63,7 @@ func (n *network) removeEndpointWithAddress(addr *net.IPNet) {
break
}
}
if networkEndpoint != nil {
delete(n.endpoints, networkEndpoint.id)
}
@ -79,10 +76,6 @@ func (n *network) removeEndpointWithAddress(addr *net.IPNet) {
if err != nil {
logrus.Debugf("Failed to delete stale overlay endpoint (%s) from hns", networkEndpoint.id[0:7])
}
if err := n.driver.deleteEndpointFromStore(networkEndpoint); err != nil {
logrus.Debugf("Failed to delete stale overlay endpoint (%s) from store", networkEndpoint.id[0:7])
}
}
}
@ -93,19 +86,23 @@ func (d *driver) CreateEndpoint(nid, eid string, ifInfo driverapi.InterfaceInfo,
return err
}
// Since we perform lazy configuration make sure we try
// configuring the driver when we enter CreateEndpoint since
// CreateNetwork may not be called in every node.
if err := d.configure(); err != nil {
return err
}
n := d.network(nid)
if n == nil {
return fmt.Errorf("network id %q not found", nid)
}
ep := &endpoint{
ep := n.endpoint(eid)
if ep != nil {
logrus.Debugf("Deleting stale endpoint %s", eid)
n.deleteEndpoint(eid)
_, err := hcsshim.HNSEndpointRequest("DELETE", ep.profileId, "")
if err != nil {
return err
}
}
ep = &endpoint{
id: eid,
nid: n.id,
addr: ifInfo.Address(),
@ -123,6 +120,7 @@ func (d *driver) CreateEndpoint(nid, eid string, ifInfo driverapi.InterfaceInfo,
// Todo: Add port bindings and qos policies here
hnsEndpoint := &hcsshim.HNSEndpoint{
Name: eid,
VirtualNetwork: n.hnsId,
IPAddress: ep.addr.IP,
EnableInternalDNS: true,
@ -167,9 +165,6 @@ func (d *driver) CreateEndpoint(nid, eid string, ifInfo driverapi.InterfaceInfo,
}
n.addEndpoint(ep)
if err := d.writeEndpointToStore(ep); err != nil {
return fmt.Errorf("failed to update overlay endpoint %s to local store: %v", ep.id[0:7], err)
}
return nil
}
@ -191,10 +186,6 @@ func (d *driver) DeleteEndpoint(nid, eid string) error {
n.deleteEndpoint(eid)
if err := d.deleteEndpointFromStore(ep); err != nil {
logrus.Warnf("Failed to delete overlay endpoint %s from local store: %v", ep.id[0:7], err)
}
_, err := hcsshim.HNSEndpointRequest("DELETE", ep.profileId, "")
if err != nil {
return err
@ -223,124 +214,3 @@ func (d *driver) EndpointOperInfo(nid, eid string) (map[string]interface{}, erro
data["AllowUnqualifiedDNSQuery"] = true
return data, nil
}
func (d *driver) deleteEndpointFromStore(e *endpoint) error {
if d.localStore == nil {
return fmt.Errorf("overlay local store not initialized, ep not deleted")
}
if err := d.localStore.DeleteObjectAtomic(e); err != nil {
return err
}
return nil
}
func (d *driver) writeEndpointToStore(e *endpoint) error {
if d.localStore == nil {
return fmt.Errorf("overlay local store not initialized, ep not added")
}
if err := d.localStore.PutObjectAtomic(e); err != nil {
return err
}
return nil
}
func (ep *endpoint) DataScope() string {
return datastore.LocalScope
}
func (ep *endpoint) New() datastore.KVObject {
return &endpoint{}
}
func (ep *endpoint) CopyTo(o datastore.KVObject) error {
dstep := o.(*endpoint)
*dstep = *ep
return nil
}
func (ep *endpoint) Key() []string {
return []string{overlayEndpointPrefix, ep.id}
}
func (ep *endpoint) KeyPrefix() []string {
return []string{overlayEndpointPrefix}
}
func (ep *endpoint) Index() uint64 {
return ep.dbIndex
}
func (ep *endpoint) SetIndex(index uint64) {
ep.dbIndex = index
ep.dbExists = true
}
func (ep *endpoint) Exists() bool {
return ep.dbExists
}
func (ep *endpoint) Skip() bool {
return false
}
func (ep *endpoint) Value() []byte {
b, err := json.Marshal(ep)
if err != nil {
return nil
}
return b
}
func (ep *endpoint) SetValue(value []byte) error {
return json.Unmarshal(value, ep)
}
func (ep *endpoint) MarshalJSON() ([]byte, error) {
epMap := make(map[string]interface{})
epMap["id"] = ep.id
epMap["nid"] = ep.nid
epMap["remote"] = ep.remote
if ep.profileId != "" {
epMap["profileId"] = ep.profileId
}
if ep.addr != nil {
epMap["addr"] = ep.addr.String()
}
if len(ep.mac) != 0 {
epMap["mac"] = ep.mac.String()
}
return json.Marshal(epMap)
}
func (ep *endpoint) UnmarshalJSON(value []byte) error {
var (
err error
epMap map[string]interface{}
)
json.Unmarshal(value, &epMap)
ep.id = epMap["id"].(string)
ep.nid = epMap["nid"].(string)
ep.remote = epMap["remote"].(bool)
if v, ok := epMap["profileId"]; ok {
ep.profileId = v.(string)
}
if v, ok := epMap["mac"]; ok {
if ep.mac, err = net.ParseMAC(v.(string)); err != nil {
return types.InternalErrorf("failed to decode endpoint interface mac address after json unmarshal: %s", v.(string))
}
}
if v, ok := epMap["addr"]; ok {
if ep.addr, err = types.ParseCIDR(v.(string)); err != nil {
return types.InternalErrorf("failed to decode endpoint interface ipv4 address after json unmarshal: %v", err)
}
}
return nil
}

View File

@ -1,209 +0,0 @@
package overlay
import (
"encoding/json"
"fmt"
"sync"
"github.com/Microsoft/hcsshim"
"github.com/Sirupsen/logrus"
"github.com/docker/libnetwork/datastore"
)
const overlayNetworkPrefix = "overlay/network"
type localNetwork struct {
id string
hnsID string
providerAddress string
dbIndex uint64
dbExists bool
sync.Mutex
}
func (d *driver) findHnsNetwork(n *network) error {
ln, err := d.getLocalNetworkFromStore(n.id)
if err != nil {
return err
}
if ln == nil {
subnets := []hcsshim.Subnet{}
for _, s := range n.subnets {
subnet := hcsshim.Subnet{
AddressPrefix: s.subnetIP.String(),
}
if s.gwIP != nil {
subnet.GatewayAddress = s.gwIP.IP.String()
}
vsidPolicy, err := json.Marshal(hcsshim.VsidPolicy{
Type: "VSID",
VSID: uint(s.vni),
})
if err != nil {
return err
}
subnet.Policies = append(subnet.Policies, vsidPolicy)
subnets = append(subnets, subnet)
}
network := &hcsshim.HNSNetwork{
Name: n.name,
Type: d.Type(),
Subnets: subnets,
NetworkAdapterName: n.interfaceName,
}
configurationb, err := json.Marshal(network)
if err != nil {
return err
}
configuration := string(configurationb)
logrus.Infof("HNSNetwork Request =%v", configuration)
hnsresponse, err := hcsshim.HNSNetworkRequest("POST", "", configuration)
if err != nil {
return err
}
n.hnsId = hnsresponse.Id
n.providerAddress = hnsresponse.ManagementIP
// Save local host specific info
if err := d.writeLocalNetworkToStore(n); err != nil {
return fmt.Errorf("failed to update data store for network %v: %v", n.id, err)
}
} else {
n.hnsId = ln.hnsID
n.providerAddress = ln.providerAddress
}
return nil
}
func (d *driver) getLocalNetworkFromStore(nid string) (*localNetwork, error) {
if d.localStore == nil {
return nil, fmt.Errorf("overlay local store not initialized, network not found")
}
n := &localNetwork{id: nid}
if err := d.localStore.GetObject(datastore.Key(n.Key()...), n); err != nil {
return nil, nil
}
return n, nil
}
func (d *driver) deleteLocalNetworkFromStore(n *network) error {
if d.localStore == nil {
return fmt.Errorf("overlay local store not initialized, network not deleted")
}
ln, err := d.getLocalNetworkFromStore(n.id)
if err != nil {
return err
}
if err = d.localStore.DeleteObjectAtomic(ln); err != nil {
return err
}
return nil
}
func (d *driver) writeLocalNetworkToStore(n *network) error {
if d.localStore == nil {
return fmt.Errorf("overlay local store not initialized, network not added")
}
ln := &localNetwork{
id: n.id,
hnsID: n.hnsId,
providerAddress: n.providerAddress,
}
if err := d.localStore.PutObjectAtomic(ln); err != nil {
return err
}
return nil
}
func (n *localNetwork) DataScope() string {
return datastore.LocalScope
}
func (n *localNetwork) New() datastore.KVObject {
return &localNetwork{}
}
func (n *localNetwork) CopyTo(o datastore.KVObject) error {
dstep := o.(*localNetwork)
*dstep = *n
return nil
}
func (n *localNetwork) Key() []string {
return []string{overlayNetworkPrefix, n.id}
}
func (n *localNetwork) KeyPrefix() []string {
return []string{overlayNetworkPrefix}
}
func (n *localNetwork) Index() uint64 {
return n.dbIndex
}
func (n *localNetwork) SetIndex(index uint64) {
n.dbIndex = index
n.dbExists = true
}
func (n *localNetwork) Exists() bool {
return n.dbExists
}
func (n *localNetwork) Skip() bool {
return false
}
func (n *localNetwork) Value() []byte {
b, err := json.Marshal(n)
if err != nil {
return nil
}
return b
}
func (n *localNetwork) SetValue(value []byte) error {
return json.Unmarshal(value, n)
}
func (n *localNetwork) MarshalJSON() ([]byte, error) {
networkMap := make(map[string]interface{})
networkMap["id"] = n.id
networkMap["hnsID"] = n.hnsID
networkMap["providerAddress"] = n.providerAddress
return json.Marshal(networkMap)
}
func (n *localNetwork) UnmarshalJSON(value []byte) error {
var networkMap map[string]interface{}
json.Unmarshal(value, &networkMap)
n.id = networkMap["id"].(string)
n.hnsID = networkMap["hnsID"].(string)
n.providerAddress = networkMap["providerAddress"].(string)
return nil
}

View File

@ -10,7 +10,6 @@ import (
"github.com/Microsoft/hcsshim"
"github.com/Sirupsen/logrus"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/types"
@ -25,9 +24,8 @@ type networkTable map[string]*network
type subnet struct {
vni uint32
initErr error
subnetIP *net.IPNet
gwIP *net.IPNet
gwIP *net.IP
}
type subnetJSON struct {
@ -40,8 +38,6 @@ type network struct {
id string
name string
hnsId string
dbIndex uint64
dbExists bool
providerAddress string
interfaceName string
endpoints endpointTable
@ -65,22 +61,31 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo d
var (
networkName string
interfaceName string
staleNetworks []string
)
if id == "" {
return fmt.Errorf("invalid network id")
}
if nInfo == nil {
return fmt.Errorf("invalid network info structure")
}
if len(ipV4Data) == 0 || ipV4Data[0].Pool.String() == "0.0.0.0/0" {
return types.BadRequestErrorf("ipv4 pool is empty")
}
staleNetworks = make([]string, 0)
vnis := make([]uint32, 0, len(ipV4Data))
// Since we perform lazy configuration make sure we try
// configuring the driver when we enter CreateNetwork
if err := d.configure(); err != nil {
return err
existingNetwork := d.network(id)
if existingNetwork != nil {
logrus.Debugf("Network preexists. Deleting %s", id)
err := d.DeleteNetwork(id)
if err != nil {
logrus.Errorf("Error deleting stale network %s", err.Error())
}
}
n := &network{
@ -119,23 +124,43 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo d
// If we are getting vnis from libnetwork, either we get for
// all subnets or none.
if len(vnis) != 0 && len(vnis) < len(ipV4Data) {
return fmt.Errorf("insufficient vnis(%d) passed to overlay", len(vnis))
if len(vnis) < len(ipV4Data) {
return fmt.Errorf("insufficient vnis(%d) passed to overlay. Windows driver requires VNIs to be prepopulated", len(vnis))
}
for i, ipd := range ipV4Data {
s := &subnet{
subnetIP: ipd.Pool,
gwIP: ipd.Gateway,
gwIP: &ipd.Gateway.IP,
}
if len(vnis) != 0 {
s.vni = vnis[i]
}
d.Lock()
for _, network := range d.networks {
found := false
for _, sub := range network.subnets {
if sub.vni == s.vni {
staleNetworks = append(staleNetworks, network.id)
found = true
break
}
}
if found {
break
}
}
d.Unlock()
n.subnets = append(n.subnets, s)
}
for _, staleNetwork := range staleNetworks {
d.DeleteNetwork(staleNetwork)
}
n.name = networkName
if n.name == "" {
n.name = id
@ -143,10 +168,6 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo d
n.interfaceName = interfaceName
if err := n.writeToStore(); err != nil {
return fmt.Errorf("failed to update data store for network %v: %v", n.id, err)
}
if nInfo != nil {
if err := nInfo.TableEventRegister(ovPeerTable); err != nil {
return err
@ -155,8 +176,13 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo d
d.addNetwork(n)
err := d.findHnsNetwork(n)
genData["com.docker.network.windowsshim.hnsid"] = n.hnsId
err := d.createHnsNetwork(n)
if err != nil {
d.deleteNetwork(id)
} else {
genData["com.docker.network.windowsshim.hnsid"] = n.hnsId
}
return err
}
@ -166,23 +192,17 @@ func (d *driver) DeleteNetwork(nid string) error {
return fmt.Errorf("invalid network id")
}
// Make sure driver resources are initialized before proceeding
if err := d.configure(); err != nil {
return err
}
n := d.network(nid)
if n == nil {
return fmt.Errorf("could not find network with id %s", nid)
return types.ForbiddenErrorf("could not find network with id %s", nid)
}
_, err := hcsshim.HNSNetworkRequest("DELETE", n.hnsId, "")
if err != nil {
return err
return types.ForbiddenErrorf(err.Error())
}
d.deleteNetwork(nid)
d.deleteLocalNetworkFromStore(n)
return nil
}
@ -209,261 +229,109 @@ func (d *driver) deleteNetwork(nid string) {
func (d *driver) network(nid string) *network {
d.Lock()
networks := d.networks
d.Unlock()
n, ok := networks[nid]
if !ok {
n = d.getNetworkFromStore(nid)
if n != nil {
n.driver = d
n.endpoints = endpointTable{}
networks[nid] = n
}
}
return n
defer d.Unlock()
return d.networks[nid]
}
func (d *driver) getNetworkFromStore(nid string) *network {
if d.store == nil {
return nil
// func (n *network) restoreNetworkEndpoints() error {
// logrus.Infof("Restoring endpoints for overlay network: %s", n.id)
// hnsresponse, err := hcsshim.HNSListEndpointRequest("GET", "", "")
// if err != nil {
// return err
// }
// for _, endpoint := range hnsresponse {
// if endpoint.VirtualNetwork != n.hnsId {
// continue
// }
// ep := n.convertToOverlayEndpoint(&endpoint)
// if ep != nil {
// logrus.Debugf("Restored endpoint:%s Remote:%t", ep.id, ep.remote)
// n.addEndpoint(ep)
// }
// }
// return nil
// }
func (n *network) convertToOverlayEndpoint(v *hcsshim.HNSEndpoint) *endpoint {
ep := &endpoint{
id: v.Name,
profileId: v.Id,
nid: n.id,
remote: v.IsRemoteEndpoint,
}
n := &network{id: nid}
if err := d.store.GetObject(datastore.Key(n.Key()...), n); err != nil {
return nil
}
mac, err := net.ParseMAC(v.MacAddress)
// As the network is being discovered from the global store, HNS may not be aware of it yet
err := d.findHnsNetwork(n)
if err != nil {
logrus.Errorf("Failed to find hns network: %v", err)
return nil
}
return n
ep.mac = mac
ep.addr = &net.IPNet{
IP: v.IPAddress,
Mask: net.CIDRMask(32, 32),
}
return ep
}
func (n *network) vxlanID(s *subnet) uint32 {
n.Lock()
defer n.Unlock()
func (d *driver) createHnsNetwork(n *network) error {
return s.vni
}
func (n *network) setVxlanID(s *subnet, vni uint32) {
n.Lock()
s.vni = vni
n.Unlock()
}
func (n *network) Key() []string {
return []string{"overlay", "network", n.id}
}
func (n *network) KeyPrefix() []string {
return []string{"overlay", "network"}
}
func (n *network) Value() []byte {
m := map[string]interface{}{}
netJSON := []*subnetJSON{}
subnets := []hcsshim.Subnet{}
for _, s := range n.subnets {
sj := &subnetJSON{
SubnetIP: s.subnetIP.String(),
GwIP: s.gwIP.String(),
Vni: s.vni,
subnet := hcsshim.Subnet{
AddressPrefix: s.subnetIP.String(),
}
netJSON = append(netJSON, sj)
}
b, err := json.Marshal(netJSON)
if err != nil {
return []byte{}
}
if s.gwIP != nil {
subnet.GatewayAddress = s.gwIP.String()
}
m["secure"] = n.secure
m["subnets"] = netJSON
m["interfaceName"] = n.interfaceName
m["providerAddress"] = n.providerAddress
m["hnsId"] = n.hnsId
m["name"] = n.name
b, err = json.Marshal(m)
if err != nil {
return []byte{}
}
vsidPolicy, err := json.Marshal(hcsshim.VsidPolicy{
Type: "VSID",
VSID: uint(s.vni),
})
return b
}
func (n *network) Index() uint64 {
return n.dbIndex
}
func (n *network) SetIndex(index uint64) {
n.dbIndex = index
n.dbExists = true
}
func (n *network) Exists() bool {
return n.dbExists
}
func (n *network) Skip() bool {
return false
}
func (n *network) SetValue(value []byte) error {
var (
m map[string]interface{}
newNet bool
isMap = true
netJSON = []*subnetJSON{}
)
if err := json.Unmarshal(value, &m); err != nil {
err := json.Unmarshal(value, &netJSON)
if err != nil {
return err
}
isMap = false
subnet.Policies = append(subnet.Policies, vsidPolicy)
subnets = append(subnets, subnet)
}
if len(n.subnets) == 0 {
newNet = true
network := &hcsshim.HNSNetwork{
Name: n.name,
Type: d.Type(),
Subnets: subnets,
NetworkAdapterName: n.interfaceName,
}
if isMap {
if val, ok := m["secure"]; ok {
n.secure = val.(bool)
}
if val, ok := m["providerAddress"]; ok {
n.providerAddress = val.(string)
}
if val, ok := m["interfaceName"]; ok {
n.interfaceName = val.(string)
}
if val, ok := m["hnsId"]; ok {
n.hnsId = val.(string)
}
if val, ok := m["name"]; ok {
n.name = val.(string)
}
bytes, err := json.Marshal(m["subnets"])
if err != nil {
return err
}
if err := json.Unmarshal(bytes, &netJSON); err != nil {
return err
}
configurationb, err := json.Marshal(network)
if err != nil {
return err
}
for _, sj := range netJSON {
subnetIPstr := sj.SubnetIP
gwIPstr := sj.GwIP
vni := sj.Vni
configuration := string(configurationb)
logrus.Infof("HNSNetwork Request =%v", configuration)
subnetIP, _ := types.ParseCIDR(subnetIPstr)
gwIP, _ := types.ParseCIDR(gwIPstr)
if newNet {
s := &subnet{
subnetIP: subnetIP,
gwIP: gwIP,
vni: vni,
}
n.subnets = append(n.subnets, s)
} else {
sNet := n.getMatchingSubnet(subnetIP)
if sNet != nil {
sNet.vni = vni
}
}
hnsresponse, err := hcsshim.HNSNetworkRequest("POST", "", configuration)
if err != nil {
return err
}
n.hnsId = hnsresponse.Id
n.providerAddress = hnsresponse.ManagementIP
return nil
}
func (n *network) DataScope() string {
return datastore.GlobalScope
}
func (n *network) writeToStore() error {
if n.driver.store == nil {
return nil
}
return n.driver.store.PutObjectAtomic(n)
}
func (n *network) releaseVxlanID() ([]uint32, error) {
if len(n.subnets) == 0 {
return nil, nil
}
if n.driver.store != nil {
if err := n.driver.store.DeleteObjectAtomic(n); err != nil {
if err == datastore.ErrKeyModified || err == datastore.ErrKeyNotFound {
// In both the above cases we can safely assume that the key has been removed by some other
// instance and so simply get out of here
return nil, nil
}
return nil, fmt.Errorf("failed to delete network to vxlan id map: %v", err)
}
}
var vnis []uint32
for _, s := range n.subnets {
if n.driver.vxlanIdm != nil {
vni := n.vxlanID(s)
vnis = append(vnis, vni)
n.driver.vxlanIdm.Release(uint64(vni))
}
n.setVxlanID(s, 0)
}
return vnis, nil
}
func (n *network) obtainVxlanID(s *subnet) error {
//return if the subnet already has a vxlan id assigned
if s.vni != 0 {
return nil
}
if n.driver.store == nil {
return fmt.Errorf("no valid vxlan id and no datastore configured, cannot obtain vxlan id")
}
for {
if err := n.driver.store.GetObject(datastore.Key(n.Key()...), n); err != nil {
return fmt.Errorf("getting network %q from datastore failed %v", n.id, err)
}
if s.vni == 0 {
vxlanID, err := n.driver.vxlanIdm.GetID()
if err != nil {
return fmt.Errorf("failed to allocate vxlan id: %v", err)
}
n.setVxlanID(s, uint32(vxlanID))
if err := n.writeToStore(); err != nil {
n.driver.vxlanIdm.Release(uint64(n.vxlanID(s)))
n.setVxlanID(s, 0)
if err == datastore.ErrKeyModified {
continue
}
return fmt.Errorf("network %q failed to update data store: %v", n.id, err)
}
return nil
}
return nil
}
}
// contains return true if the passed ip belongs to one the network's
// subnets
func (n *network) contains(ip net.IP) bool {

View File

@ -1,179 +0,0 @@
package overlay
import (
"fmt"
"net"
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/hashicorp/serf/serf"
)
type ovNotify struct {
action string
ep *endpoint
nw *network
}
type logWriter struct{}
func (l *logWriter) Write(p []byte) (int, error) {
str := string(p)
switch {
case strings.Contains(str, "[WARN]"):
logrus.Warn(str)
case strings.Contains(str, "[DEBUG]"):
logrus.Debug(str)
case strings.Contains(str, "[INFO]"):
logrus.Info(str)
case strings.Contains(str, "[ERR]"):
logrus.Error(str)
}
return len(p), nil
}
func (d *driver) serfInit() error {
var err error
config := serf.DefaultConfig()
config.Init()
config.MemberlistConfig.BindAddr = d.bindAddress
d.eventCh = make(chan serf.Event, 4)
config.EventCh = d.eventCh
config.UserCoalescePeriod = 1 * time.Second
config.UserQuiescentPeriod = 50 * time.Millisecond
config.LogOutput = &logWriter{}
config.MemberlistConfig.LogOutput = config.LogOutput
s, err := serf.Create(config)
if err != nil {
return fmt.Errorf("failed to create cluster node: %v", err)
}
defer func() {
if err != nil {
s.Shutdown()
}
}()
d.serfInstance = s
d.notifyCh = make(chan ovNotify)
d.exitCh = make(chan chan struct{})
go d.startSerfLoop(d.eventCh, d.notifyCh, d.exitCh)
return nil
}
func (d *driver) serfJoin(neighIP string) error {
if neighIP == "" {
return fmt.Errorf("no neighbor to join")
}
if _, err := d.serfInstance.Join([]string{neighIP}, false); err != nil {
return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v",
neighIP, err)
}
return nil
}
func (d *driver) notifyEvent(event ovNotify) {
ep := event.ep
ePayload := fmt.Sprintf("%s %s %s %s", event.action, ep.addr.IP.String(),
net.IP(ep.addr.Mask).String(), ep.mac.String())
eName := fmt.Sprintf("jl %s %s %s", d.serfInstance.LocalMember().Addr.String(),
event.nw.id, ep.id)
if err := d.serfInstance.UserEvent(eName, []byte(ePayload), true); err != nil {
logrus.Errorf("Sending user event failed: %v\n", err)
}
}
func (d *driver) processEvent(u serf.UserEvent) {
logrus.Debugf("Received user event name:%s, payload:%s\n", u.Name,
string(u.Payload))
var dummy, action, vtepStr, nid, eid, ipStr, maskStr, macStr string
if _, err := fmt.Sscan(u.Name, &dummy, &vtepStr, &nid, &eid); err != nil {
fmt.Printf("Failed to scan name string: %v\n", err)
}
if _, err := fmt.Sscan(string(u.Payload), &action,
&ipStr, &maskStr, &macStr); err != nil {
fmt.Printf("Failed to scan value string: %v\n", err)
}
logrus.Debugf("Parsed data = %s/%s/%s/%s/%s/%s\n", nid, eid, vtepStr, ipStr, maskStr, macStr)
mac, err := net.ParseMAC(macStr)
if err != nil {
logrus.Errorf("Failed to parse mac: %v\n", err)
}
if d.serfInstance.LocalMember().Addr.String() == vtepStr {
return
}
switch action {
case "join":
if err := d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac,
net.ParseIP(vtepStr), true); err != nil {
logrus.Errorf("Peer add failed in the driver: %v\n", err)
}
case "leave":
if err := d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac,
net.ParseIP(vtepStr), true); err != nil {
logrus.Errorf("Peer delete failed in the driver: %v\n", err)
}
}
}
func (d *driver) startSerfLoop(eventCh chan serf.Event, notifyCh chan ovNotify,
exitCh chan chan struct{}) {
for {
select {
case notify, ok := <-notifyCh:
if !ok {
break
}
d.notifyEvent(notify)
case ch, ok := <-exitCh:
if !ok {
break
}
if err := d.serfInstance.Leave(); err != nil {
logrus.Errorf("failed leaving the cluster: %v\n", err)
}
d.serfInstance.Shutdown()
close(ch)
return
case e, ok := <-eventCh:
if !ok {
break
}
u, ok := e.(serf.UserEvent)
if !ok {
break
}
d.processEvent(u)
}
}
}
func (d *driver) isSerfAlive() bool {
d.Lock()
serfInstance := d.serfInstance
d.Unlock()
if serfInstance == nil || serfInstance.State() != serf.SerfAlive {
return false
}
return true
}

View File

@ -3,7 +3,7 @@ package overlay
//go:generate protoc -I.:../../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/drivers/overlay,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. overlay.proto
import (
"fmt"
"encoding/json"
"net"
"sync"
@ -12,40 +12,24 @@ import (
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/idm"
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/types"
"github.com/hashicorp/serf/serf"
)
const (
networkType = "overlay"
vethPrefix = "veth"
vethLen = 7
vxlanIDStart = 4096
vxlanIDEnd = (1 << 24) - 1
vxlanPort = 4789
vxlanEncap = 50
secureOption = "encrypted"
)
var initVxlanIdm = make(chan (bool), 1)
type driver struct {
eventCh chan serf.Event
notifyCh chan ovNotify
exitCh chan chan struct{}
bindAddress string
advertiseAddress string
neighIP string
config map[string]interface{}
serfInstance *serf.Serf
networks networkTable
store datastore.DataStore
localStore datastore.DataStore
vxlanIdm *idm.Idm
once sync.Once
joinOnce sync.Once
config map[string]interface{}
networks networkTable
store datastore.DataStore
localStore datastore.DataStore
once sync.Once
joinOnce sync.Once
sync.Mutex
}
@ -84,92 +68,75 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
}
}
d.restoreEndpoints()
d.restoreHNSNetworks()
return dc.RegisterDriver(networkType, d, c)
}
// Endpoints are stored in the local store. Restore them and reconstruct the overlay sandbox
func (d *driver) restoreEndpoints() error {
if d.localStore == nil {
logrus.Warnf("Cannot restore overlay endpoints because local datastore is missing")
return nil
}
kvol, err := d.localStore.List(datastore.Key(overlayEndpointPrefix), &endpoint{})
if err != nil && err != datastore.ErrKeyNotFound {
return fmt.Errorf("failed to read overlay endpoint from store: %v", err)
func (d *driver) restoreHNSNetworks() error {
logrus.Infof("Restoring existing overlay networks from HNS into docker")
hnsresponse, err := hcsshim.HNSListNetworkRequest("GET", "", "")
if err != nil {
return err
}
if err == datastore.ErrKeyNotFound {
return nil
}
for _, kvo := range kvol {
ep := kvo.(*endpoint)
n := d.network(ep.nid)
if n == nil || ep.remote {
if !ep.remote {
logrus.Debugf("Network (%s) not found for restored endpoint (%s)", ep.nid[0:7], ep.id[0:7])
logrus.Debugf("Deleting stale overlay endpoint (%s) from store", ep.id[0:7])
}
hcsshim.HNSEndpointRequest("DELETE", ep.profileId, "")
if err := d.deleteEndpointFromStore(ep); err != nil {
logrus.Debugf("Failed to delete stale overlay endpoint (%s) from store", ep.id[0:7])
}
for _, v := range hnsresponse {
if v.Type != networkType {
continue
}
n.addEndpoint(ep)
logrus.Infof("Restoring overlay network: %s", v.Name)
n := d.convertToOverlayNetwork(&v)
d.addNetwork(n)
//
// We assume that any network will be recreated on daemon restart
// and therefore don't restore hns endpoints for now
//
//n.restoreNetworkEndpoints()
}
return nil
}
// Fini cleans up the driver resources
func Fini(drv driverapi.Driver) {
d := drv.(*driver)
if d.exitCh != nil {
waitCh := make(chan struct{})
d.exitCh <- waitCh
<-waitCh
}
}
func (d *driver) configure() error {
if d.store == nil {
return nil
func (d *driver) convertToOverlayNetwork(v *hcsshim.HNSNetwork) *network {
n := &network{
id: v.Name,
hnsId: v.Id,
driver: d,
endpoints: endpointTable{},
subnets: []*subnet{},
providerAddress: v.ManagementIP,
}
if d.vxlanIdm == nil {
return d.initializeVxlanIdm()
for _, hnsSubnet := range v.Subnets {
vsidPolicy := &hcsshim.VsidPolicy{}
for _, policy := range hnsSubnet.Policies {
if err := json.Unmarshal([]byte(policy), &vsidPolicy); err == nil && vsidPolicy.Type == "VSID" {
break
}
}
gwIP := net.ParseIP(hnsSubnet.GatewayAddress)
localsubnet := &subnet{
vni: uint32(vsidPolicy.VSID),
gwIP: &gwIP,
}
_, subnetIP, err := net.ParseCIDR(hnsSubnet.AddressPrefix)
if err != nil {
logrus.Errorf("Error parsing subnet address %s ", hnsSubnet.AddressPrefix)
continue
}
localsubnet.subnetIP = subnetIP
n.subnets = append(n.subnets, localsubnet)
}
return nil
}
func (d *driver) initializeVxlanIdm() error {
var err error
initVxlanIdm <- true
defer func() { <-initVxlanIdm }()
if d.vxlanIdm != nil {
return nil
}
d.vxlanIdm, err = idm.New(d.store, "vxlan-id", vxlanIDStart, vxlanIDEnd)
if err != nil {
return fmt.Errorf("failed to initialize vxlan id manager: %v", err)
}
return nil
return n
}
func (d *driver) Type() string {
@ -180,122 +147,12 @@ func (d *driver) IsBuiltIn() bool {
return true
}
func validateSelf(node string) error {
advIP := net.ParseIP(node)
if advIP == nil {
return fmt.Errorf("invalid self address (%s)", node)
}
addrs, err := net.InterfaceAddrs()
if err != nil {
return fmt.Errorf("Unable to get interface addresses %v", err)
}
for _, addr := range addrs {
ip, _, err := net.ParseCIDR(addr.String())
if err == nil && ip.Equal(advIP) {
return nil
}
}
return fmt.Errorf("Multi-Host overlay networking requires cluster-advertise(%s) to be configured with a local ip-address that is reachable within the cluster", advIP.String())
}
func (d *driver) nodeJoin(advertiseAddress, bindAddress string, self bool) {
if self && !d.isSerfAlive() {
if err := validateSelf(advertiseAddress); err != nil {
logrus.Errorf("%s", err.Error())
}
d.Lock()
d.advertiseAddress = advertiseAddress
d.bindAddress = bindAddress
d.Unlock()
// If there is no cluster store there is no need to start serf.
if d.store != nil {
err := d.serfInit()
if err != nil {
logrus.Errorf("initializing serf instance failed: %v", err)
return
}
}
}
d.Lock()
if !self {
d.neighIP = advertiseAddress
}
neighIP := d.neighIP
d.Unlock()
if d.serfInstance != nil && neighIP != "" {
var err error
d.joinOnce.Do(func() {
err = d.serfJoin(neighIP)
if err == nil {
d.pushLocalDb()
}
})
if err != nil {
logrus.Errorf("joining serf neighbor %s failed: %v", advertiseAddress, err)
d.Lock()
d.joinOnce = sync.Once{}
d.Unlock()
return
}
}
}
func (d *driver) pushLocalEndpointEvent(action, nid, eid string) {
n := d.network(nid)
if n == nil {
logrus.Debugf("Error pushing local endpoint event for network %s", nid)
return
}
ep := n.endpoint(eid)
if ep == nil {
logrus.Debugf("Error pushing local endpoint event for ep %s / %s", nid, eid)
return
}
if !d.isSerfAlive() {
return
}
d.notifyCh <- ovNotify{
action: action,
nw: n,
ep: ep,
}
}
// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) error {
var err error
switch dType {
case discoverapi.NodeDiscovery:
nodeData, ok := data.(discoverapi.NodeDiscoveryData)
if !ok || nodeData.Address == "" {
return fmt.Errorf("invalid discovery data")
}
d.nodeJoin(nodeData.Address, nodeData.BindAddress, nodeData.Self)
case discoverapi.DatastoreConfig:
if d.store != nil {
return types.ForbiddenErrorf("cannot accept datastore configuration: Overlay driver has a datastore configured already")
}
dsc, ok := data.(discoverapi.DatastoreConfigData)
if !ok {
return types.InternalErrorf("incorrect data in datastore configuration: %v", data)
}
d.store, err = datastore.NewDataStoreFromConfig(dsc)
if err != nil {
return types.InternalErrorf("failed to initialize data store: %v", err)
}
default:
}
return nil
return types.NotImplementedErrorf("not implemented")
}
// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster
func (d *driver) DiscoverDelete(dType discoverapi.DiscoveryType, data interface{}) error {
return nil
return types.NotImplementedErrorf("not implemented")
}

View File

@ -7,40 +7,13 @@ import (
"encoding/json"
"github.com/Sirupsen/logrus"
"github.com/docker/libnetwork/types"
"github.com/Microsoft/hcsshim"
"github.com/docker/libnetwork/types"
)
const ovPeerTable = "overlay_peer_table"
func (d *driver) pushLocalDb() {
if !d.isSerfAlive() {
return
}
d.Lock()
networks := d.networks
d.Unlock()
for _, n := range networks {
n.Lock()
endpoints := n.endpoints
n.Unlock()
for _, ep := range endpoints {
if !ep.remote {
d.notifyCh <- ovNotify{
action: "join",
nw: n,
ep: ep,
}
}
}
}
}
func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error {
@ -59,6 +32,7 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
logrus.Info("WINOVERLAY: peerAdd: notifying HNS of the REMOTE endpoint")
hnsEndpoint := &hcsshim.HNSEndpoint{
Name: eid,
VirtualNetwork: n.hnsId,
MacAddress: peerMac.String(),
IPAddress: peerIP,
@ -109,10 +83,6 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
}
n.addEndpoint(ep)
if err := d.writeEndpointToStore(ep); err != nil {
return fmt.Errorf("failed to update overlay endpoint %s to local store: %v", ep.id[0:7], err)
}
}
return nil
@ -144,10 +114,6 @@ func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMas
}
n.deleteEndpoint(eid)
if err := d.deleteEndpointFromStore(ep); err != nil {
logrus.Debugf("Failed to delete stale overlay endpoint (%s) from store", ep.id[0:7])
}
}
return nil

View File

@ -309,7 +309,7 @@ func (d *driver) DeleteNetwork(nid string) error {
_, err = hcsshim.HNSNetworkRequest("DELETE", config.HnsID, "")
if err != nil {
return err
return types.ForbiddenErrorf(err.Error())
}
d.Lock()