mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
![Jana Radhakrishnan](/assets/img/avatar_default.png)
This commit brings in the first implementation of overlay driver which makes use of vxlan tunneling protocol to create logical networks across multiple hosts. This is very much alpha code and should be used for demo and testing purposes only. Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
249 lines
5.7 KiB
Go
249 lines
5.7 KiB
Go
package overlay
|
|
|
|
import (
|
|
"fmt"
|
|
"net"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/docker/libnetwork/types"
|
|
"github.com/hashicorp/serf/serf"
|
|
)
|
|
|
|
type ovNotify struct {
|
|
action string
|
|
eid types.UUID
|
|
nid types.UUID
|
|
}
|
|
|
|
type logWriter struct{}
|
|
|
|
func (l *logWriter) Write(p []byte) (int, error) {
|
|
str := string(p)
|
|
|
|
switch {
|
|
case strings.Contains(str, "[WARN]"):
|
|
logrus.Warn(str)
|
|
case strings.Contains(str, "[DEBUG]"):
|
|
logrus.Debug(str)
|
|
case strings.Contains(str, "[INFO]"):
|
|
logrus.Info(str)
|
|
case strings.Contains(str, "[ERR]"):
|
|
logrus.Error(str)
|
|
}
|
|
|
|
return len(p), nil
|
|
}
|
|
|
|
func getBindAddr(ifaceName string) (string, error) {
|
|
iface, err := net.InterfaceByName(ifaceName)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
|
|
}
|
|
|
|
addrs, err := iface.Addrs()
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to get interface addresses: %v", err)
|
|
}
|
|
|
|
for _, a := range addrs {
|
|
addr, ok := a.(*net.IPNet)
|
|
if !ok {
|
|
continue
|
|
}
|
|
addrIP := addr.IP
|
|
|
|
if addrIP.IsLinkLocalUnicast() {
|
|
continue
|
|
}
|
|
|
|
return addrIP.String(), nil
|
|
}
|
|
|
|
return "", fmt.Errorf("failed to get bind address")
|
|
}
|
|
|
|
func (d *driver) serfInit() error {
|
|
var err error
|
|
|
|
config := serf.DefaultConfig()
|
|
config.Init()
|
|
if d.ifaceName != "" {
|
|
bindAddr, err := getBindAddr(d.ifaceName)
|
|
if err != nil {
|
|
return fmt.Errorf("getBindAddr error: %v", err)
|
|
}
|
|
config.MemberlistConfig.BindAddr = bindAddr
|
|
}
|
|
|
|
d.eventCh = make(chan serf.Event, 4)
|
|
config.EventCh = d.eventCh
|
|
config.UserCoalescePeriod = 1 * time.Second
|
|
config.UserQuiescentPeriod = 50 * time.Millisecond
|
|
|
|
config.LogOutput = logrus.StandardLogger().Out
|
|
|
|
s, err := serf.Create(config)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create cluster node: %v", err)
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
s.Shutdown()
|
|
}
|
|
}()
|
|
|
|
if d.neighIP != "" {
|
|
if _, err = s.Join([]string{d.neighIP}, false); err != nil {
|
|
return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v",
|
|
d.neighIP, err)
|
|
}
|
|
}
|
|
|
|
d.serfInstance = s
|
|
|
|
d.notifyCh = make(chan ovNotify)
|
|
d.exitCh = make(chan chan struct{})
|
|
|
|
go d.startSerfLoop(d.eventCh, d.notifyCh, d.exitCh)
|
|
return nil
|
|
}
|
|
|
|
func (d *driver) notifyEvent(event ovNotify) {
|
|
n := d.network(event.nid)
|
|
ep := n.endpoint(event.eid)
|
|
|
|
ePayload := fmt.Sprintf("%s %s %s", event.action, ep.addr.IP.String(), ep.mac.String())
|
|
eName := fmt.Sprintf("jl %s %s %s", d.serfInstance.LocalMember().Addr.String(),
|
|
event.nid, event.eid)
|
|
|
|
if err := d.serfInstance.UserEvent(eName, []byte(ePayload), true); err != nil {
|
|
fmt.Printf("Sending user event failed: %v\n", err)
|
|
}
|
|
}
|
|
|
|
func (d *driver) processEvent(u serf.UserEvent) {
|
|
fmt.Printf("Received user event name:%s, payload:%s\n", u.Name,
|
|
string(u.Payload))
|
|
|
|
var dummy, action, vtepStr, nid, eid, ipStr, macStr string
|
|
if _, err := fmt.Sscan(u.Name, &dummy, &vtepStr, &nid, &eid); err != nil {
|
|
fmt.Printf("Failed to scan name string: %v\n", err)
|
|
}
|
|
|
|
if _, err := fmt.Sscan(string(u.Payload), &action,
|
|
&ipStr, &macStr); err != nil {
|
|
fmt.Printf("Failed to scan value string: %v\n", err)
|
|
}
|
|
|
|
fmt.Printf("Parsed data = %s/%s/%s/%s/%s\n", nid, eid, vtepStr, ipStr, macStr)
|
|
|
|
mac, err := net.ParseMAC(macStr)
|
|
if err != nil {
|
|
fmt.Printf("Failed to parse mac: %v\n", err)
|
|
}
|
|
|
|
if d.serfInstance.LocalMember().Addr.String() == vtepStr {
|
|
return
|
|
}
|
|
|
|
switch action {
|
|
case "join":
|
|
if err := d.peerAdd(types.UUID(nid), types.UUID(eid), net.ParseIP(ipStr), mac,
|
|
net.ParseIP(vtepStr), true); err != nil {
|
|
fmt.Printf("Peer add failed in the driver: %v\n", err)
|
|
}
|
|
case "leave":
|
|
if err := d.peerDelete(types.UUID(nid), types.UUID(eid), net.ParseIP(ipStr), mac,
|
|
net.ParseIP(vtepStr), true); err != nil {
|
|
fmt.Printf("Peer delete failed in the driver: %v\n", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *driver) processQuery(q *serf.Query) {
|
|
fmt.Printf("Received query name:%s, payload:%s\n", q.Name,
|
|
string(q.Payload))
|
|
|
|
var nid, ipStr string
|
|
if _, err := fmt.Sscan(string(q.Payload), &nid, &ipStr); err != nil {
|
|
fmt.Printf("Failed to scan query payload string: %v\n", err)
|
|
}
|
|
|
|
peerMac, vtep, err := d.peerDbSearch(types.UUID(nid), net.ParseIP(ipStr))
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
q.Respond([]byte(fmt.Sprintf("%s %s", peerMac.String(), vtep.String())))
|
|
}
|
|
|
|
func (d *driver) resolvePeer(nid types.UUID, peerIP net.IP) (net.HardwareAddr, net.IP, error) {
|
|
qPayload := fmt.Sprintf("%s %s", string(nid), peerIP.String())
|
|
resp, err := d.serfInstance.Query("peerlookup", []byte(qPayload), nil)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("resolving peer by querying the cluster failed: %v", err)
|
|
}
|
|
|
|
respCh := resp.ResponseCh()
|
|
select {
|
|
case r := <-respCh:
|
|
var macStr, vtepStr string
|
|
if _, err := fmt.Sscan(string(r.Payload), &macStr, &vtepStr); err != nil {
|
|
return nil, nil, fmt.Errorf("bad response %q for the resolve query: %v", string(r.Payload), err)
|
|
}
|
|
|
|
mac, err := net.ParseMAC(macStr)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to parse mac: %v", err)
|
|
}
|
|
|
|
return mac, net.ParseIP(vtepStr), nil
|
|
|
|
case <-time.After(time.Second):
|
|
return nil, nil, fmt.Errorf("timed out resolving peer by querying the cluster")
|
|
}
|
|
}
|
|
|
|
func (d *driver) startSerfLoop(eventCh chan serf.Event, notifyCh chan ovNotify,
|
|
exitCh chan chan struct{}) {
|
|
|
|
for {
|
|
select {
|
|
case notify, ok := <-notifyCh:
|
|
if !ok {
|
|
break
|
|
}
|
|
|
|
d.notifyEvent(notify)
|
|
case ch, ok := <-exitCh:
|
|
if !ok {
|
|
break
|
|
}
|
|
|
|
if err := d.serfInstance.Leave(); err != nil {
|
|
fmt.Printf("failed leaving the cluster: %v\n", err)
|
|
}
|
|
|
|
d.serfInstance.Shutdown()
|
|
close(ch)
|
|
return
|
|
case e, ok := <-eventCh:
|
|
if !ok {
|
|
break
|
|
}
|
|
|
|
if e.EventType() == serf.EventQuery {
|
|
d.processQuery(e.(*serf.Query))
|
|
break
|
|
}
|
|
|
|
u, ok := e.(serf.UserEvent)
|
|
if !ok {
|
|
break
|
|
}
|
|
d.processEvent(u)
|
|
}
|
|
}
|
|
}
|